diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java b/api/src/main/java/org/apache/iceberg/Transaction.java index aeec1f589d06..c89e7656cbbb 100644 --- a/api/src/main/java/org/apache/iceberg/Transaction.java +++ b/api/src/main/java/org/apache/iceberg/Transaction.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg; +import java.util.List; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; @@ -172,4 +173,14 @@ default ManageSnapshots manageSnapshots() { * @throws CommitFailedException If the updates cannot be committed due to conflicts. */ void commitTransaction(); + + /** Rolls back any pending changes. */ + default void rollback() { + throw new UnsupportedOperationException("Rollback not supported"); + } + + /** Provides access to the pending changes that are about to be committed. */ + default List pendingUpdates() { + throw new UnsupportedOperationException("Pending Updates not supported"); + } } diff --git a/api/src/main/java/org/apache/iceberg/catalog/CatalogTransaction.java b/api/src/main/java/org/apache/iceberg/catalog/CatalogTransaction.java new file mode 100644 index 000000000000..5d623c6d5470 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/catalog/CatalogTransaction.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.catalog; + +public interface CatalogTransaction { + + enum IsolationLevel { + + /** + * All reads that are being made will see the last committed values that existed when the table + * was loaded inside the catalog transaction. Will successfully commit only if the values + * updated by the transaction do not conflict with other concurrent updates.
+ *
+ * + *

Note that under SNAPSHOT isolation a write skew anomaly is acceptable and + * permitted. In a write skew anomaly, two transactions (T1 and T2) concurrently read an + * overlapping data set (e.g. values V1 and V2), concurrently make disjoint updates (e.g. T1 + * updates V1, T2 updates V2), and finally concurrently commit, neither having seen the update + * performed by the other. + */ + SNAPSHOT, + + /** + * All reads that are being made will see the last committed values that existed when the table + * was loaded inside the catalog transaction. All tables participating in the transaction must + * be in the same state when committing compared to when the table was loaded first within the + * catalog transaction.
+ *
+ * + *

Note that a write skew anomaly is not possible under SERIALIZABLE isolation, where + * two transactions (T1 and T2) concurrently read an overlapping data set (e.g. values V1 and + * V2), concurrently make disjoint updates (e.g. T1 updates V1, T2 updates V2). This is because + * under SERIALIZABLE isolation either T1 or T2 would have to occur first and be visible to the + * other transaction. + */ + SERIALIZABLE; + } + + /** + * Performs an atomic commit of all the pending changes across multiple tables. Engine-specific + * implementations must ensure that all pending changes are applied atomically. + */ + void commitTransaction(); + + /** Rolls back any pending changes across tables. */ + void rollback(); + + /** + * Returns this catalog transaction as a {@link Catalog} API so that any actions that are called + * through this API are participating in this catalog transaction. + * + * @return This catalog transaction as a {@link Catalog} API. Any actions that are called through + * this API are participating in this catalog transaction. + */ + Catalog asCatalog(); + + /** + * Returns the current {@link IsolationLevel} for this transaction. + * + * @return The {@link IsolationLevel} for this transaction. + */ + IsolationLevel isolationLevel(); +} diff --git a/api/src/main/java/org/apache/iceberg/catalog/SupportsCatalogTransactions.java b/api/src/main/java/org/apache/iceberg/catalog/SupportsCatalogTransactions.java new file mode 100644 index 000000000000..114eff339b80 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/catalog/SupportsCatalogTransactions.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.catalog; + +import java.util.Set; +import org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel; + +public interface SupportsCatalogTransactions { + + /** + * Get the set of {@link IsolationLevel}s supported by this Catalog. + * + * @return A set of isolation levels supported. + */ + Set supportedIsolationLevels(); + + /** + * Start a new {@link CatalogTransaction} with the given {@link IsolationLevel}. + * + * @param isolationLevel The isolation level to use. + * @return A new {@link CatalogTransaction}. + */ + CatalogTransaction startTransaction(IsolationLevel isolationLevel); +} diff --git a/core/src/main/java/org/apache/iceberg/BaseCatalogTransaction.java b/core/src/main/java/org/apache/iceberg/BaseCatalogTransaction.java new file mode 100644 index 000000000000..73cef741f096 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseCatalogTransaction.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.iceberg.BaseTransaction.TransactionTable; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.CatalogTransaction; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; + +public abstract class BaseCatalogTransaction implements CatalogTransaction { + private final Map txByTable; + private final Map initiallyReadTableMetadata; + private final IsolationLevel isolationLevel; + private final BaseMetastoreCatalog origin; + private boolean hasCommitted = false; + + public BaseCatalogTransaction(BaseMetastoreCatalog origin, IsolationLevel isolationLevel) { + Preconditions.checkArgument(null != origin, "Invalid origin catalog: null"); + Preconditions.checkArgument(null != isolationLevel, "Invalid isolation level: null"); + this.origin = origin; + this.isolationLevel = isolationLevel; + this.txByTable = Maps.newConcurrentMap(); + this.initiallyReadTableMetadata = Maps.newConcurrentMap(); + } + + @Override + public void rollback() { + Tasks.foreach(txByTable.values()).run(Transaction::rollback); + txByTable.clear(); + initiallyReadTableMetadata.clear(); + } + + protected Map txByTable() { + return txByTable; + } + + protected Map initiallyReadTableMetadata() { + return initiallyReadTableMetadata; + } + + protected Catalog origin() { + return origin; + } + + protected boolean hasCommitted() { + return hasCommitted; + } + + public void setHasCommitted(boolean hasCommitted) { + this.hasCommitted = hasCommitted; + } + + private TableIdentifier identifierWithoutCatalog(String tableWithCatalog) { + if (tableWithCatalog.startsWith(origin.name())) { + return TableIdentifier.parse(tableWithCatalog.replace(origin.name() + ".", "")); + } + return TableIdentifier.parse(tableWithCatalog); + } + + @Override + public Catalog asCatalog() { + return new AsTransactionalCatalog(); + } + + private Optional txTable(TableIdentifier identifier) { + if (txByTable.containsKey(identifier)) { + return Optional.ofNullable(txByTable.get(identifier).table()); + } + + return Optional.empty(); + } + + private Transaction txForTable(Table table) { + return txByTable.computeIfAbsent( + identifierWithoutCatalog(table.name()), + k -> { + TableOperations operations = ((HasTableOperations) table).operations(); + return Transactions.newTransaction(table.name(), operations); + }); + } + + @Override + public IsolationLevel isolationLevel() { + return isolationLevel; + } + + public class AsTransactionalCatalog extends BaseMetastoreCatalog { + @Override + public Table loadTable(TableIdentifier identifier) { + Table table = + BaseCatalogTransaction.this + .txTable(identifier) + .orElseGet( + () -> { + Table loadTable = origin.loadTable(identifier); + + // we need to remember the very first version of table metadata that we read + if (IsolationLevel.SERIALIZABLE == isolationLevel()) { + initiallyReadTableMetadata.computeIfAbsent( + identifier, (ident) -> ((BaseTable) loadTable).operations().current()); + } + + return loadTable; + }); + + TableOperations tableOps = + table instanceof TransactionTable + ? ((TransactionTable) table).operations() + : ((BaseTable) table).operations(); + return new TransactionalTable(table, tableOps); + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + Optional
txTable = BaseCatalogTransaction.this.txTable(tableIdentifier); + if (txTable.isPresent()) { + return ((TransactionTable) txTable.get()).operations(); + } + return origin.newTableOps(tableIdentifier); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + return origin.defaultWarehouseLocation(tableIdentifier); + } + + @Override + public List listTables(Namespace namespace) { + return origin.listTables(namespace); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + return origin.dropTable(identifier, purge); + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + origin.renameTable(from, to); + } + } + + private class TransactionalTable extends BaseTable { + private final Table table; + + private TransactionalTable(Table table, TableOperations ops) { + super(ops, table.name()); + this.table = table; + } + + @Override + public UpdateSchema updateSchema() { + return txForTable(table).updateSchema(); + } + + @Override + public UpdatePartitionSpec updateSpec() { + return txForTable(table).updateSpec(); + } + + @Override + public UpdateProperties updateProperties() { + return txForTable(table).updateProperties(); + } + + @Override + public ReplaceSortOrder replaceSortOrder() { + return txForTable(table).replaceSortOrder(); + } + + @Override + public UpdateLocation updateLocation() { + return txForTable(table).updateLocation(); + } + + @Override + public AppendFiles newAppend() { + return txForTable(table).newAppend(); + } + + @Override + public AppendFiles newFastAppend() { + return txForTable(table).newFastAppend(); + } + + @Override + public RewriteFiles newRewrite() { + return txForTable(table).newRewrite(); + } + + @Override + public RewriteManifests rewriteManifests() { + return txForTable(table).rewriteManifests(); + } + + @Override + public OverwriteFiles newOverwrite() { + return txForTable(table).newOverwrite(); + } + + @Override + public RowDelta newRowDelta() { + return txForTable(table).newRowDelta(); + } + + @Override + public ReplacePartitions newReplacePartitions() { + return txForTable(table).newReplacePartitions(); + } + + @Override + public DeleteFiles newDelete() { + return txForTable(table).newDelete(); + } + + @Override + public UpdateStatistics updateStatistics() { + return txForTable(table).updateStatistics(); + } + + @Override + public ExpireSnapshots expireSnapshots() { + return txForTable(table).expireSnapshots(); + } + + @Override + public ManageSnapshots manageSnapshots() { + return txForTable(table).manageSnapshots(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java index 32d69cc98f75..395aa3caa4bd 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java @@ -31,7 +31,7 @@ import org.apache.iceberg.expressions.Term; import org.apache.iceberg.util.Tasks; -public class BaseReplaceSortOrder implements ReplaceSortOrder { +public class BaseReplaceSortOrder implements ReplaceSortOrder, TableMetadataDiffAccess { private final TableOperations ops; private final SortOrder.Builder builder; private TableMetadata base; @@ -60,9 +60,7 @@ public void commit() { .run( taskOps -> { this.base = ops.refresh(); - SortOrder newOrder = apply(); - TableMetadata updated = base.replaceSortOrder(newOrder); - taskOps.commit(base, updated); + taskOps.commit(base, tableMetadataDiff().updated()); }); } @@ -77,4 +75,12 @@ public ReplaceSortOrder desc(Term term, NullOrder nullOrder) { builder.desc(term, nullOrder); return this; } + + @Override + public TableMetadataDiff tableMetadataDiff() { + return ImmutableTableMetadataDiff.builder() + .base(base) + .updated(base.replaceSortOrder(apply())) + .build(); + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index cef487931b0e..9d5d0a37265f 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -43,6 +43,7 @@ import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.PropertyUtil; @@ -426,10 +427,10 @@ private void commitSimpleTransaction() { throw e; } catch (PendingUpdateFailedException e) { - cleanUpOnCommitFailure(); + rollback(); throw e.wrapped(); } catch (RuntimeException e) { - cleanUpOnCommitFailure(); + rollback(); throw e; } @@ -468,7 +469,8 @@ private void commitSimpleTransaction() { } } - private void cleanUpOnCommitFailure() { + @Override + public void rollback() { // the commit failed and no files were committed. clean up each update. Tasks.foreach(updates) .suppressFailureWhenFinished() @@ -486,6 +488,11 @@ private void cleanUpOnCommitFailure() { .run(ops.io()::deleteFile); } + @Override + public List pendingUpdates() { + return ImmutableList.copyOf(updates); + } + private void applyUpdates(TableOperations underlyingOps) { if (base != underlyingOps.refresh()) { // use refreshed the metadata diff --git a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java index 02794da2e4de..878fc0baecb7 100644 --- a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java +++ b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -41,7 +41,7 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.util.Pair; -class BaseUpdatePartitionSpec implements UpdatePartitionSpec { +class BaseUpdatePartitionSpec implements UpdatePartitionSpec, TableMetadataDiffAccess { private final TableOperations ops; private final TableMetadata base; private final int formatVersion; @@ -328,8 +328,7 @@ public PartitionSpec apply() { @Override public void commit() { - TableMetadata update = base.updatePartitionSpec(apply()); - ops.commit(base, update); + ops.commit(base, tableMetadataDiff().updated()); } private Pair> resolve(Term term) { @@ -396,6 +395,14 @@ private boolean isTimeTransform(PartitionField field) { return PartitionSpecVisitor.visit(schema, field, IsTimeTransform.INSTANCE); } + @Override + public TableMetadataDiff tableMetadataDiff() { + return ImmutableTableMetadataDiff.builder() + .base(base) + .updated(base.updatePartitionSpec(apply())) + .build(); + } + private static class IsTimeTransform implements PartitionSpecVisitor { private static final IsTimeTransform INSTANCE = new IsTimeTransform(); diff --git a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java index 9dfcf8c52867..1eb7e3fae1ad 100644 --- a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java +++ b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg; +import java.util.List; + class CommitCallbackTransaction implements Transaction { static Transaction addCallback(Transaction txn, Runnable callback) { return new CommitCallbackTransaction(txn, callback); @@ -121,4 +123,14 @@ public void commitTransaction() { wrapped.commitTransaction(); callback.run(); } + + @Override + public void rollback() { + wrapped.rollback(); + } + + @Override + public List pendingUpdates() { + return wrapped.pendingUpdates(); + } } diff --git a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java index 9168b84b4042..7e2bf139790a 100644 --- a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java @@ -35,7 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.Tasks; -class PropertiesUpdate implements UpdateProperties { +class PropertiesUpdate implements UpdateProperties, TableMetadataDiffAccess { private final TableOperations ops; private final Map updates = Maps.newHashMap(); private final Set removals = Sets.newHashSet(); @@ -106,11 +106,16 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) - .run( - taskOps -> { - Map newProperties = apply(); - TableMetadata updated = base.replaceProperties(newProperties); - taskOps.commit(base, updated); - }); + .run(taskOps -> taskOps.commit(base, tableMetadataDiff().updated())); + } + + @Override + public TableMetadataDiff tableMetadataDiff() { + TableMetadata original = base; + Map newProperties = apply(); + return ImmutableTableMetadataDiff.builder() + .base(original) + .updated(base.replaceProperties(newProperties)) + .build(); } } diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index fa6fcdf41442..ee6ed5911d3c 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("UnnecessaryAnonymousClass") -class RemoveSnapshots implements ExpireSnapshots { +class RemoveSnapshots implements ExpireSnapshots, TableMetadataDiffAccess { private static final Logger LOG = LoggerFactory.getLogger(RemoveSnapshots.class); // Creates an executor service that runs each task in the thread that invokes execute/submit. @@ -303,8 +303,7 @@ public void commit() { .onlyRetryOn(CommitFailedException.class) .run( item -> { - TableMetadata updated = internalApply(); - ops.commit(base, updated); + ops.commit(base, tableMetadataDiff().updated()); }); LOG.info("Committed snapshot changes"); @@ -337,4 +336,12 @@ private void cleanExpiredSnapshots() { cleanupStrategy.cleanFiles(base, current); } + + @Override + public TableMetadataDiff tableMetadataDiff() { + return ImmutableTableMetadataDiff.builder() + .base(ops.current()) + .updated(internalApply()) + .build(); + } } diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index c4f879924379..bb1e3ed1bfb3 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; /** Schema evolution API implementation. */ -class SchemaUpdate implements UpdateSchema { +class SchemaUpdate implements UpdateSchema, TableMetadataDiffAccess { private static final Logger LOG = LoggerFactory.getLogger(SchemaUpdate.class); private static final int TABLE_ROOT_ID = -1; @@ -430,16 +430,12 @@ private void internalMove(String name, Move move) { */ @Override public Schema apply() { - Schema newSchema = - applyChanges(schema, deletes, updates, adds, moves, identifierFieldNames, caseSensitive); - - return newSchema; + return applyChanges(schema, deletes, updates, adds, moves, identifierFieldNames, caseSensitive); } @Override public void commit() { - TableMetadata update = applyChangesToMetadata(base.updateSchema(apply(), lastColumnId)); - ops.commit(base, update); + ops.commit(base, tableMetadataDiff().updated()); } private int assignNewColumnId() { @@ -551,6 +547,14 @@ private static Schema applyChanges( return new Schema(struct.fields(), freshIdentifierFieldIds); } + @Override + public TableMetadataDiff tableMetadataDiff() { + return ImmutableTableMetadataDiff.builder() + .base(base) + .updated(applyChangesToMetadata(base.updateSchema(apply(), lastColumnId))) + .build(); + } + private static class ApplyChanges extends TypeUtil.SchemaVisitor { private final List deletes; private final Map updates; diff --git a/core/src/main/java/org/apache/iceberg/SetLocation.java b/core/src/main/java/org/apache/iceberg/SetLocation.java index 148e4b8bc8be..edbe74c22ea0 100644 --- a/core/src/main/java/org/apache/iceberg/SetLocation.java +++ b/core/src/main/java/org/apache/iceberg/SetLocation.java @@ -30,7 +30,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.util.Tasks; -public class SetLocation implements UpdateLocation { +public class SetLocation implements UpdateLocation, TableMetadataDiffAccess { private final TableOperations ops; private String newLocation; @@ -61,6 +61,14 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) - .run(taskOps -> taskOps.commit(base, base.updateLocation(newLocation))); + .run(taskOps -> taskOps.commit(base, tableMetadataDiff().updated())); + } + + @Override + public TableMetadataDiff tableMetadataDiff() { + return ImmutableTableMetadataDiff.builder() + .base(ops.current()) + .updated(ops.current().updateLocation(newLocation)) + .build(); } } diff --git a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java index 0f80b4e1f233..036eef23b3a6 100644 --- a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java +++ b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java @@ -40,7 +40,7 @@ *

This update is not exposed though the Table API. Instead, it is a package-private part of the * Transaction API intended for use in {@link ManageSnapshots}. */ -class SetSnapshotOperation implements PendingUpdate { +class SetSnapshotOperation implements PendingUpdate, TableMetadataDiffAccess { private final TableOperations ops; private TableMetadata base; @@ -117,12 +117,6 @@ public void commit() { .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { - Snapshot snapshot = apply(); - TableMetadata updated = - TableMetadata.buildFrom(base) - .setBranchSnapshot(snapshot.snapshotId(), SnapshotRef.MAIN_BRANCH) - .build(); - // Do commit this operation even if the metadata has not changed, as we need to // advance the hasLastOpCommited for the transaction's commit to work properly. // (Without any other operations in the transaction, the commitTransaction() call @@ -132,7 +126,7 @@ public void commit() { // this operation retries // to ensure that if a concurrent operation assigns the UUID, this operation will not // fail. - taskOps.commit(base, updated.withUUID()); + taskOps.commit(base, tableMetadataDiff().updated().withUUID()); }); } @@ -164,4 +158,15 @@ private static List currentAncestors(TableMetadata meta) { private static boolean isCurrentAncestor(TableMetadata meta, long snapshotId) { return currentAncestors(meta).contains(snapshotId); } + + @Override + public TableMetadataDiff tableMetadataDiff() { + TableMetadata original = base; + Snapshot snapshot = apply(); + TableMetadata updated = + TableMetadata.buildFrom(base) + .setBranchSnapshot(snapshot.snapshotId(), SnapshotRef.MAIN_BRANCH) + .build(); + return ImmutableTableMetadataDiff.builder().base(original).updated(updated).build(); + } } diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java b/core/src/main/java/org/apache/iceberg/SetStatistics.java index 41c7254d6cdc..c97baf8ab4a1 100644 --- a/core/src/main/java/org/apache/iceberg/SetStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java @@ -24,7 +24,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -public class SetStatistics implements UpdateStatistics { +public class SetStatistics implements UpdateStatistics, TableMetadataDiffAccess { private final TableOperations ops; private final Map> statisticsToSet = Maps.newHashMap(); @@ -52,9 +52,7 @@ public List apply() { @Override public void commit() { - TableMetadata base = ops.current(); - TableMetadata newMetadata = internalApply(base); - ops.commit(base, newMetadata); + ops.commit(ops.current(), tableMetadataDiff().updated()); } private TableMetadata internalApply(TableMetadata base) { @@ -69,4 +67,12 @@ private TableMetadata internalApply(TableMetadata base) { }); return builder.build(); } + + @Override + public TableMetadataDiff tableMetadataDiff() { + return ImmutableTableMetadataDiff.builder() + .base(ops.current()) + .updated(internalApply(ops.current())) + .build(); + } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 4a7aa746315f..a4f7e26613dd 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -63,7 +63,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("UnnecessaryAnonymousClass") -abstract class SnapshotProducer implements SnapshotUpdate { +abstract class SnapshotProducer implements SnapshotUpdate, TableMetadataDiffAccess { private static final Logger LOG = LoggerFactory.getLogger(SnapshotProducer.class); static final Set EMPTY_SET = Sets.newHashSet(); @@ -365,19 +365,7 @@ public void commit() { .countAttempts(commitMetrics().attempts()) .run( taskOps -> { - Snapshot newSnapshot = apply(); - newSnapshotId.set(newSnapshot.snapshotId()); - TableMetadata.Builder update = TableMetadata.buildFrom(base); - if (base.snapshot(newSnapshot.snapshotId()) != null) { - // this is a rollback operation - update.setBranchSnapshot(newSnapshot.snapshotId(), targetBranch); - } else if (stageOnly) { - update.addSnapshot(newSnapshot); - } else { - update.setBranchSnapshot(newSnapshot, targetBranch); - } - - TableMetadata updated = update.build(); + TableMetadata updated = updatedMetadata(newSnapshotId::set).updated(); if (updated.changes().isEmpty()) { // do not commit if the metadata has not changed. for example, this may happen // when setting the current @@ -432,6 +420,28 @@ public void commit() { } } + @Override + public TableMetadataDiff tableMetadataDiff() { + return updatedMetadata(id -> {}); + } + + private TableMetadataDiff updatedMetadata(Consumer snapshotIdConsumer) { + TableMetadata original = base; + Snapshot newSnapshot = apply(); + snapshotIdConsumer.accept(newSnapshot.snapshotId()); + TableMetadata.Builder update = TableMetadata.buildFrom(base); + if (base.snapshot(newSnapshot.snapshotId()) != null) { + // this is a rollback operation + update.setBranchSnapshot(newSnapshot.snapshotId(), targetBranch); + } else if (stageOnly) { + update.addSnapshot(newSnapshot); + } else { + update.setBranchSnapshot(newSnapshot, targetBranch); + } + + return ImmutableTableMetadataDiff.builder().base(original).updated(update.build()).build(); + } + private void notifyListeners() { try { Object event = updateEvent(); diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataDiffAccess.java b/core/src/main/java/org/apache/iceberg/TableMetadataDiffAccess.java new file mode 100644 index 000000000000..25b5f1af8f46 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/TableMetadataDiffAccess.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.immutables.value.Value; + +/** + * This provides access to {@link TableMetadata} in its base version prior to applying and updates + * and also in its updated version. + */ +public interface TableMetadataDiffAccess { + + TableMetadataDiff tableMetadataDiff(); + + @Value.Immutable + interface TableMetadataDiff { + TableMetadata base(); + + TableMetadata updated(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java index b87bac2f014f..fdc2ad9acb3e 100644 --- a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java +++ b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java @@ -27,7 +27,8 @@ * ToDo: Add SetSnapshotOperation operations such as setCurrentSnapshot, rollBackTime, rollbackTo to * this class so that we can support those operations for refs. */ -class UpdateSnapshotReferencesOperation implements PendingUpdate> { +class UpdateSnapshotReferencesOperation + implements PendingUpdate>, TableMetadataDiffAccess { private final TableOperations ops; private final Map updatedRefs; @@ -46,8 +47,7 @@ public Map apply() { @Override public void commit() { - TableMetadata updated = internalApply(); - ops.commit(base, updated); + ops.commit(base, tableMetadataDiff().updated()); } public UpdateSnapshotReferencesOperation createBranch(String name, long snapshotId) { @@ -205,4 +205,9 @@ private TableMetadata internalApply() { return updatedBuilder.build(); } + + @Override + public TableMetadataDiff tableMetadataDiff() { + return ImmutableTableMetadataDiff.builder().base(base).updated(internalApply()).build(); + } } diff --git a/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java b/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java index d6ee4d345cfa..a711fb485e7c 100644 --- a/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java @@ -28,8 +28,11 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.rest.RESTCatalogTransaction; +import org.apache.iceberg.rest.RESTSessionCatalog; public abstract class BaseSessionCatalog implements SessionCatalog { private final Cache catalogs = @@ -62,7 +65,7 @@ public T withContext(SessionContext context, Function task) { return task.apply(asCatalog(context)); } - public class AsCatalog implements Catalog, SupportsNamespaces { + public class AsCatalog implements Catalog, SupportsNamespaces, SupportsCatalogTransactions { private final SessionContext context; private AsCatalog(SessionContext context) { @@ -159,5 +162,27 @@ public boolean removeProperties(Namespace namespace, Set removals) { public boolean namespaceExists(Namespace namespace) { return BaseSessionCatalog.this.namespaceExists(context, namespace); } + + @Override + public Set supportedIsolationLevels() { + return ImmutableSet.of( + CatalogTransaction.IsolationLevel.SNAPSHOT, + CatalogTransaction.IsolationLevel.SERIALIZABLE); + } + + @Override + public CatalogTransaction startTransaction(CatalogTransaction.IsolationLevel isolationLevel) { + Preconditions.checkState( + BaseSessionCatalog.this instanceof RESTSessionCatalog, + "Only RESTSessionCatalog currently supports CatalogTransactions"); + Preconditions.checkState( + supportedIsolationLevels().contains(isolationLevel), + "Invalid isolation level %s. Supported isolation levels: %s", + isolationLevel, + supportedIsolationLevels()); + + return new RESTCatalogTransaction( + this, (RESTSessionCatalog) BaseSessionCatalog.this, context, isolationLevel); + } } } diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index 8ecbfa5373fd..24a6ee0b17e3 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -41,7 +41,10 @@ import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.CatalogTransaction; +import org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel; import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsCatalogTransactions; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; @@ -54,6 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.LocationUtil; @@ -61,7 +65,10 @@ import org.slf4j.LoggerFactory; public class JdbcCatalog extends BaseMetastoreCatalog - implements Configurable, SupportsNamespaces, Closeable { + implements Configurable, + SupportsNamespaces, + Closeable, + SupportsCatalogTransactions { public static final String PROPERTY_PREFIX = "jdbc."; private static final String NAMESPACE_EXISTS_PROPERTY = "exists"; @@ -488,6 +495,26 @@ private int execute(Consumer sqlErrorHandler, String sql, String.. } } + @Override + public Set supportedIsolationLevels() { + return ImmutableSet.of(IsolationLevel.SNAPSHOT, IsolationLevel.SERIALIZABLE); + } + + @Override + public CatalogTransaction startTransaction(IsolationLevel isolationLevel) { + Preconditions.checkState( + supportedIsolationLevels().contains(isolationLevel), + "Invalid isolation level %s. Supported isolation levels: %s", + isolationLevel, + supportedIsolationLevels()); + + // there's no isolation between operations on the same connections, so use a new client pool + return new JdbcCatalogTransaction( + this, + isolationLevel, + new JdbcClientPool(properties().get(CatalogProperties.URI), properties())); + } + @FunctionalInterface interface RowProducer { R apply(ResultSet result) throws SQLException; diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalogTransaction.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalogTransaction.java new file mode 100644 index 000000000000..f71dd4e46a5d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalogTransaction.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.jdbc; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import org.apache.iceberg.BaseCatalogTransaction; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Tasks; + +public class JdbcCatalogTransaction extends BaseCatalogTransaction { + private final JdbcClientPool connections; + + public JdbcCatalogTransaction( + BaseMetastoreCatalog origin, IsolationLevel isolationLevel, JdbcClientPool connections) { + super(origin, isolationLevel); + this.connections = connections; + beginTransaction(); + } + + private void beginTransaction() { + try { + connections.run( + conn -> { + try (PreparedStatement sql = conn.prepareStatement(JdbcUtil.BEGIN_TX)) { + return sql.execute(); + } + }); + } catch (SQLException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void commitTransaction() { + Preconditions.checkState(!hasCommitted(), "Transaction has already committed changes"); + + try { + for (TableIdentifier readTable : initiallyReadTableMetadata().keySet()) { + // we need to check all read tables to determine whether they changed outside the catalog + // TX after we initially read them + if (IsolationLevel.SERIALIZABLE == isolationLevel()) { + TableMetadata currentTableMetadata = + ((BaseTable) origin().loadTable(readTable)).operations().current(); + + if (!currentTableMetadata + .metadataFileLocation() + .equals(initiallyReadTableMetadata().get(readTable).metadataFileLocation())) { + throw new ValidationException( + "%s isolation violation: Found table metadata updates to table '%s' after it was read", + isolationLevel(), readTable); + } + } + } + + // FIXME: it seems that these are partially updated and not rolled back anymore + Tasks.foreach(txByTable().values()).run(Transaction::commitTransaction); + + connections.run( + conn -> { + try (PreparedStatement sql = conn.prepareStatement(JdbcUtil.COMMIT_TX)) { + return sql.execute(); + } + }); + + setHasCommitted(true); + } catch (CommitStateUnknownException e) { + throw e; + } catch (RuntimeException e) { + rollback(); + throw e; + } catch (SQLException | InterruptedException e) { + rollback(); + throw new RuntimeException(e); + } + } + + @Override + public void rollback() { + try { + super.rollback(); + connections.run( + conn -> { + try (PreparedStatement sql = conn.prepareStatement(JdbcUtil.ROLLBACK_TX)) { + return sql.execute(); + } + }); + } catch (SQLException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java index 3ffa47d2ea68..be7dd5e8dd47 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java @@ -44,6 +44,10 @@ final class JdbcUtil { static final String METADATA_LOCATION = "metadata_location"; static final String PREVIOUS_METADATA_LOCATION = "previous_metadata_location"; + static final String BEGIN_TX = "BEGIN"; + static final String COMMIT_TX = "COMMIT"; + static final String ROLLBACK_TX = "ROLLBACK"; + static final String DO_COMMIT_SQL = "UPDATE " + CATALOG_TABLE_NAME diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index d297fc738317..7957d9d95cdb 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -51,6 +51,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.rest.requests.CommitTxRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; @@ -283,6 +284,21 @@ public static void renameTable(Catalog catalog, RenameTableRequest request) { catalog.renameTable(request.source(), request.destination()); } + public static void commitTransaction(Catalog catalog, CommitTxRequest request) { + for (CommitTxRequest.CommitTableRequest tableChange : request.tableChanges()) { + Table table = catalog.loadTable(tableChange.identifier()); + if (table instanceof BaseTable) { + TableOperations ops = ((BaseTable) table).operations(); + UpdateTableRequest updateTableRequest = + new UpdateTableRequest(tableChange.requirements(), tableChange.updates()); + TableMetadata finalMetadata = commit(ops, updateTableRequest); + // TODO: final metadata for each table should probably be sent back to the client + } else { + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + } + } + private static boolean isCreate(UpdateTableRequest request) { boolean isCreate = request.requirements().stream() diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java index 83a08b1a2cd3..1730f9568712 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java @@ -31,8 +31,11 @@ import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.CatalogTransaction; +import org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.SupportsCatalogTransactions; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; @@ -41,10 +44,15 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class RESTCatalog - implements Catalog, SupportsNamespaces, Configurable, Closeable { + implements Catalog, + SupportsNamespaces, + Configurable, + Closeable, + SupportsCatalogTransactions { private final RESTSessionCatalog sessionCatalog; private final Catalog delegate; private final SupportsNamespaces nsDelegate; + private final SupportsCatalogTransactions catalogTxDelegate; public RESTCatalog() { this( @@ -62,6 +70,7 @@ public RESTCatalog( this.sessionCatalog = new RESTSessionCatalog(clientBuilder); this.delegate = sessionCatalog.asCatalog(context); this.nsDelegate = (SupportsNamespaces) delegate; + this.catalogTxDelegate = (SupportsCatalogTransactions) delegate; } @Override @@ -250,4 +259,14 @@ public void setConf(Configuration conf) { public void close() throws IOException { sessionCatalog.close(); } + + @Override + public Set supportedIsolationLevels() { + return catalogTxDelegate.supportedIsolationLevels(); + } + + @Override + public CatalogTransaction startTransaction(IsolationLevel isolationLevel) { + return catalogTxDelegate.startTransaction(isolationLevel); + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogTransaction.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogTransaction.java new file mode 100644 index 000000000000..b9326dfff55e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogTransaction.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.BaseTransaction; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.PendingUpdate; +import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.ReplaceSortOrder; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.RewriteManifests; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataDiffAccess; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.Transactions; +import org.apache.iceberg.UpdateLocation; +import org.apache.iceberg.UpdatePartitionSpec; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.UpdateStatistics; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.CatalogTransaction; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.util.Tasks; + +public class RESTCatalogTransaction implements CatalogTransaction { + private final Map txByTable; + private final Map initiallyReadTableMetadata; + private final IsolationLevel isolationLevel; + private final Catalog origin; + private final RESTSessionCatalog sessionCatalog; + private final SessionCatalog.SessionContext context; + private boolean hasCommitted = false; + + public RESTCatalogTransaction( + Catalog origin, + RESTSessionCatalog sessionCatalog, + SessionCatalog.SessionContext context, + IsolationLevel isolationLevel) { + Preconditions.checkArgument(null != origin, "Invalid origin catalog: null"); + Preconditions.checkArgument(null != sessionCatalog, "Invalid session catalog: null"); + Preconditions.checkArgument(null != context, "Invalid session context: null"); + Preconditions.checkArgument(null != isolationLevel, "Invalid isolation level: null"); + this.origin = origin; + this.isolationLevel = isolationLevel; + this.sessionCatalog = sessionCatalog; + this.context = context; + this.txByTable = Maps.newConcurrentMap(); + this.initiallyReadTableMetadata = Maps.newConcurrentMap(); + // TODO: we should probably have a TX ID for logging and other purposes + } + + @Override + public void commitTransaction() { + Preconditions.checkState(!hasCommitted, "Transaction has already committed changes"); + + try { + for (TableIdentifier readTable : initiallyReadTableMetadata.keySet()) { + // we need to check all read tables to determine whether they changed outside the catalog + // TX after we initially read them + if (IsolationLevel.SERIALIZABLE == isolationLevel) { + TableMetadata currentTableMetadata = + ((BaseTable) origin.loadTable(readTable)).operations().current(); + + if (!currentTableMetadata + .metadataFileLocation() + .equals(initiallyReadTableMetadata.get(readTable).metadataFileLocation())) { + throw new ValidationException( + "%s isolation violation: Found table metadata updates to table '%s' after it was read", + isolationLevel(), readTable); + } + } + } + + Map> pendingUpdatesByTable = Maps.newConcurrentMap(); + txByTable.forEach((key, value) -> pendingUpdatesByTable.put(key, value.pendingUpdates())); + Map updatesByTable = Maps.newHashMap(); + + for (TableIdentifier affectedTable : pendingUpdatesByTable.keySet()) { + List tableMetadataDiffs = + pendingUpdatesByTable.get(affectedTable).stream() + .filter(pendingUpdate -> pendingUpdate instanceof TableMetadataDiffAccess) + .map(pendingUpdate -> (TableMetadataDiffAccess) pendingUpdate) + .map(TableMetadataDiffAccess::tableMetadataDiff) + .collect(Collectors.toList()); + + // first one contains the base metadata to create requirements from + TableMetadataDiffAccess.TableMetadataDiff firstOne = tableMetadataDiffs.get(0); + // the last one contains all metadata updates + TableMetadataDiffAccess.TableMetadataDiff lastOne = + tableMetadataDiffs.get(tableMetadataDiffs.size() - 1); + + List metadataUpdates = lastOne.updated().changes(); + + UpdateTableRequest.Builder builder = UpdateTableRequest.builderFor(firstOne.base()); + metadataUpdates.forEach(builder::update); + UpdateTableRequest updateTableRequest = builder.build(); + updatesByTable.put(affectedTable, updateTableRequest); + } + + // TODO: what if a TX failed, should we make sure it can't be committed multiple times? + // TODO: should this be retryable? + sessionCatalog.commitCatalogTransaction(context, updatesByTable); + + hasCommitted = true; + } catch (CommitStateUnknownException e) { + throw e; + } catch (RuntimeException e) { + rollback(); + throw e; + } + } + + @Override + public void rollback() { + Tasks.foreach(txByTable.values()).run(Transaction::rollback); + txByTable.clear(); + initiallyReadTableMetadata.clear(); + } + + @Override + public Catalog asCatalog() { + return new AsTransactionalCatalog(); + } + + @Override + public IsolationLevel isolationLevel() { + return isolationLevel; + } + + private Optional

txTable(TableIdentifier identifier) { + if (txByTable.containsKey(identifier)) { + return Optional.ofNullable(txByTable.get(identifier).table()); + } + + return Optional.empty(); + } + + /** + * We're using a {@link Transaction} per table so that we can keep track of pending changes for a + * particular table. + */ + private Transaction txForTable(Table table) { + return txByTable.computeIfAbsent( + identifierWithoutCatalog(table.name()), + k -> Transactions.newTransaction(table.name(), ((HasTableOperations) table).operations())); + } + + // TODO: this functionality should probably live somewhere else to be reusable + private TableIdentifier identifierWithoutCatalog(String tableWithCatalog) { + if (tableWithCatalog.startsWith(origin.name())) { + return TableIdentifier.parse(tableWithCatalog.replace(origin.name() + ".", "")); + } + return TableIdentifier.parse(tableWithCatalog); + } + + public class AsTransactionalCatalog implements Catalog { + @Override + public Table loadTable(TableIdentifier identifier) { + Table table = + RESTCatalogTransaction.this + .txTable(identifier) + .orElseGet( + () -> { + Table loadTable = origin.loadTable(identifier); + + // we need to remember the very first version of table metadata that we read + if (IsolationLevel.SERIALIZABLE == isolationLevel()) { + initiallyReadTableMetadata.computeIfAbsent( + identifier, (ident) -> ((BaseTable) loadTable).operations().current()); + } + + return loadTable; + }); + + TableOperations tableOps = + table instanceof BaseTransaction.TransactionTable + ? ((BaseTransaction.TransactionTable) table).operations() + : ((BaseTable) table).operations(); + return new TransactionalTable(table, tableOps); + } + + @Override + public List listTables(Namespace namespace) { + return origin.listTables(namespace); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + return origin.dropTable(identifier, purge); + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + origin.renameTable(from, to); + } + } + + private class TransactionalTable extends BaseTable { + private final Table table; + + private TransactionalTable(Table table, TableOperations ops) { + super(ops, table.name()); + this.table = table; + } + + @Override + public UpdateSchema updateSchema() { + return txForTable(table).updateSchema(); + } + + @Override + public UpdatePartitionSpec updateSpec() { + return txForTable(table).updateSpec(); + } + + @Override + public UpdateProperties updateProperties() { + return txForTable(table).updateProperties(); + } + + @Override + public ReplaceSortOrder replaceSortOrder() { + return txForTable(table).replaceSortOrder(); + } + + @Override + public UpdateLocation updateLocation() { + return txForTable(table).updateLocation(); + } + + @Override + public AppendFiles newAppend() { + return txForTable(table).newAppend(); + } + + @Override + public AppendFiles newFastAppend() { + return txForTable(table).newFastAppend(); + } + + @Override + public RewriteFiles newRewrite() { + return txForTable(table).newRewrite(); + } + + @Override + public RewriteManifests rewriteManifests() { + return txForTable(table).rewriteManifests(); + } + + @Override + public OverwriteFiles newOverwrite() { + return txForTable(table).newOverwrite(); + } + + @Override + public RowDelta newRowDelta() { + return txForTable(table).newRowDelta(); + } + + @Override + public ReplacePartitions newReplacePartitions() { + return txForTable(table).newReplacePartitions(); + } + + @Override + public DeleteFiles newDelete() { + return txForTable(table).newDelete(); + } + + @Override + public UpdateStatistics updateStatistics() { + return txForTable(table).updateStatistics(); + } + + @Override + public ExpireSnapshots expireSnapshots() { + return txForTable(table).expireSnapshots(); + } + + @Override + public ManageSnapshots manageSnapshots() { + return txForTable(table).manageSnapshots(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java index 9e17f50c530a..c59608ad93da 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java @@ -42,6 +42,9 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.TableIdentifierParser; import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.rest.requests.CommitTxRequest; +import org.apache.iceberg.rest.requests.CommitTxRequestParser; +import org.apache.iceberg.rest.requests.ImmutableCommitTxRequest; import org.apache.iceberg.rest.requests.ImmutableReportMetricsRequest; import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.requests.ReportMetricsRequestParser; @@ -83,7 +86,12 @@ public static void registerAll(ObjectMapper mapper) { .addDeserializer(ReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>()) .addSerializer(ImmutableReportMetricsRequest.class, new ReportMetricsRequestSerializer<>()) .addDeserializer( - ImmutableReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>()); + ImmutableReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>()) + .addSerializer(CommitTxRequest.class, new CommitTxRequestSerializer<>()) + .addSerializer(ImmutableCommitTxRequest.class, new CommitTxRequestSerializer<>()) + .addDeserializer(CommitTxRequest.class, new CommitTxRequestDeserializer<>()) + .addDeserializer(ImmutableCommitTxRequest.class, new CommitTxRequestDeserializer<>()); + mapper.registerModule(module); } @@ -280,4 +288,22 @@ public T deserialize(JsonParser p, DeserializationContext context) throws IOExce return (T) ReportMetricsRequestParser.fromJson(jsonNode); } } + + public static class CommitTxRequestSerializer + extends JsonSerializer { + @Override + public void serialize(T request, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + CommitTxRequestParser.toJson(request, gen); + } + } + + public static class CommitTxRequestDeserializer + extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return (T) CommitTxRequestParser.fromJson(jsonNode); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index e21475980237..57da579dff23 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -66,11 +66,15 @@ import org.apache.iceberg.rest.auth.OAuth2Properties; import org.apache.iceberg.rest.auth.OAuth2Util; import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession; +import org.apache.iceberg.rest.requests.CommitTxRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.ImmutableCommitTableRequest; +import org.apache.iceberg.rest.requests.ImmutableCommitTxRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.CreateNamespaceResponse; import org.apache.iceberg.rest.responses.GetNamespaceResponse; @@ -871,4 +875,29 @@ private static Cache newSessionCache(Map pr (RemovalListener) (id, auth, cause) -> auth.stopRefreshing()) .build(); } + + void commitCatalogTransaction( + SessionContext context, Map updatesByTable) { + ImmutableCommitTxRequest.Builder builder = ImmutableCommitTxRequest.builder(); + updatesByTable.forEach( + (ident, update) -> + builder.addTableChanges( + ImmutableCommitTableRequest.builder() + .identifier(ident) + .requirements(update.requirements()) + .updates(update.updates()) + .build())); + CommitTxRequest commitTxRequest = builder.build(); + + client.post( + paths.commitTransaction(), + commitTxRequest, + null, + headers(context), + ErrorHandlers.tableCommitHandler()); + + // TODO: we also need to update table metadata after a successful commit + // -> the CommitTxResponse should contain all new TableMetadata objects + + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java b/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java index 6fa09f33d2ba..9684aea2f661 100644 --- a/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java +++ b/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java @@ -85,4 +85,12 @@ public String metrics(TableIdentifier identifier) { RESTUtil.encodeString(identifier.name()), "metrics"); } + + public String startTransaction() { + return SLASH.join("v1", prefix, "startTransaction"); + } + + public String commitTransaction() { + return SLASH.join("v1", prefix, "commitTransaction"); + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/CommitTxRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/CommitTxRequest.java new file mode 100644 index 000000000000..f9db8ef901b3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/CommitTxRequest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import java.util.List; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.RESTRequest; +import org.immutables.value.Value; + +@Value.Immutable +public interface CommitTxRequest extends RESTRequest { + List tableChanges(); + + @Override + default void validate() { + check(); + } + + @Value.Check + default void check() { + Preconditions.checkArgument(!tableChanges().isEmpty(), "Table changes must be provided"); + } + + @Value.Immutable + interface CommitTableRequest { + TableIdentifier identifier(); + + List requirements(); + + List updates(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/CommitTxRequestParser.java b/core/src/main/java/org/apache/iceberg/rest/requests/CommitTxRequestParser.java new file mode 100644 index 000000000000..f8f2b0af1d1c --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/CommitTxRequestParser.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.MetadataUpdateParser; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.TableIdentifierParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class CommitTxRequestParser { + private static final String TABLE_CHANGES = "table-changes"; + private static final String UPDATES = "updates"; + private static final String REQUIREMENTS = "requirements"; + private static final String IDENTIFIER = "identifier"; + + private CommitTxRequestParser() {} + + public static String toJson(CommitTxRequest request) { + return toJson(request, false); + } + + public static String toJson(CommitTxRequest request, boolean pretty) { + return JsonUtil.generate(gen -> toJson(request, gen), pretty); + } + + public static void toJson(CommitTxRequest request, JsonGenerator gen) throws IOException { + Preconditions.checkArgument(null != request, "Invalid commit tx request: null"); + + gen.writeStartObject(); + + for (CommitTxRequest.CommitTableRequest tableChange : request.tableChanges()) { + gen.writeObjectFieldStart(tableChange.identifier().toString()); + gen.writeFieldName(IDENTIFIER); + TableIdentifierParser.toJson(tableChange.identifier(), gen); + + gen.writeArrayFieldStart(REQUIREMENTS); + for (UpdateTableRequest.UpdateRequirement updateRequirement : tableChange.requirements()) { + UpdateRequirementParser.toJson(updateRequirement, gen); + } + gen.writeEndArray(); + + gen.writeArrayFieldStart(UPDATES); + for (MetadataUpdate metadataUpdate : tableChange.updates()) { + MetadataUpdateParser.toJson(metadataUpdate, gen); + } + gen.writeEndArray(); + + gen.writeEndObject(); + } + + gen.writeEndObject(); + } + + public static CommitTxRequest fromJson(String json) { + return JsonUtil.parse(json, CommitTxRequestParser::fromJson); + } + + public static CommitTxRequest fromJson(JsonNode json) { + Preconditions.checkArgument(null != json, "Cannot parse commit tx request from null object"); + Preconditions.checkArgument( + json.isObject(), "Cannot parse commit tx request from non-object: %s", json); + + ImmutableCommitTxRequest.Builder builder = ImmutableCommitTxRequest.builder(); + json.fields() + .forEachRemaining( + node -> { + ImmutableCommitTableRequest.Builder build = ImmutableCommitTableRequest.builder(); + TableIdentifier identifier = + TableIdentifierParser.fromJson(JsonUtil.get(IDENTIFIER, node.getValue())); + build.identifier(identifier); + + JsonNode requirements = JsonUtil.get(REQUIREMENTS, node.getValue()); + Preconditions.checkArgument( + requirements.isArray(), + "Cannot parse requirements from non-array: %s", + requirements); + requirements.forEach( + req -> build.addRequirements(UpdateRequirementParser.fromJson(req))); + + JsonNode updates = JsonUtil.get(UPDATES, node.getValue()); + Preconditions.checkArgument( + updates.isArray(), "Cannot parse metadata updates from non-array: %s", updates); + + updates.forEach(update -> build.addUpdates(MetadataUpdateParser.fromJson(update))); + builder.addTableChanges(build.build()); + }); + + return builder.build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/StartTxRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/StartTxRequest.java new file mode 100644 index 000000000000..5df5088f990f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/StartTxRequest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import org.apache.iceberg.rest.RESTRequest; +import org.immutables.value.Value; + +@Value.Immutable +public interface StartTxRequest extends RESTRequest { + + @Override + default void validate() { + // nothing to do + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index c7d36bdc0078..b45eb239ab0b 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import java.util.function.Function; import org.apache.iceberg.DataFile; import org.apache.iceberg.HistoryEntry; @@ -331,6 +332,15 @@ public static Snapshot snapshotAfter(Table table, long snapshotId) { * timestamp */ public static long snapshotIdAsOfTime(Table table, long timestampMillis) { + return snapshotIdAsOfTimeOptional(table, timestampMillis) + .orElseThrow( + () -> + new IllegalArgumentException( + "Cannot find a snapshot older than " + + DateTimeUtil.formatTimestampMillis(timestampMillis))); + } + + public static Optional snapshotIdAsOfTimeOptional(Table table, long timestampMillis) { Long snapshotId = null; for (HistoryEntry logEntry : table.history()) { if (logEntry.timestampMillis() <= timestampMillis) { @@ -338,11 +348,7 @@ public static long snapshotIdAsOfTime(Table table, long timestampMillis) { } } - Preconditions.checkArgument( - snapshotId != null, - "Cannot find a snapshot older than %s", - DateTimeUtil.formatTimestampMillis(timestampMillis)); - return snapshotId; + return Optional.ofNullable(snapshotId); } /** diff --git a/core/src/test/java/org/apache/iceberg/CatalogTransactionTests.java b/core/src/test/java/org/apache/iceberg/CatalogTransactionTests.java new file mode 100644 index 000000000000..c9fd533e26f8 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/CatalogTransactionTests.java @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE; +import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.CatalogTransaction; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsCatalogTransactions; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public abstract class CatalogTransactionTests< + C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> { + + @TempDir protected Path metadataDir; + + protected static final Schema SCHEMA = + new Schema( + required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get())); + + // Partition spec used to create tables + protected static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build(); + + protected static final DataFile FILE_A = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") // easy way to set partition data for now + .withRecordCount(1) + .build(); + protected static final DataFile FILE_B = + DataFiles.builder(SPEC) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=1") // easy way to set partition data for now + .withRecordCount(1) + .build(); + protected static final DataFile FILE_C = + DataFiles.builder(SPEC) + .withPath("/path/to/data-c.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=2") // easy way to set partition data for now + .withRecordCount(1) + .build(); + protected static final DataFile FILE_D = + DataFiles.builder(SPEC) + .withPath("/path/to/data-d.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=3") // easy way to set partition data for now + .withRecordCount(1) + .build(); + + protected abstract C catalog(); + + @Test + public void catalogTxWithSingleOp() { + catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT); + } + + @Test + public void catalogTxWithSingleOpWithSerializable() { + catalogTxWithSingleOp(SERIALIZABLE); + } + + private void catalogTxWithSingleOp(CatalogTransaction.IsolationLevel isolationLevel) { + TableIdentifier identifier = TableIdentifier.of("ns", "tx-with-single-op"); + catalog().createTable(identifier, SCHEMA, SPEC); + + Table one = catalog().loadTable(identifier); + TableMetadata base = ((BaseTable) one).operations().current(); + + CatalogTransaction catalogTransaction = catalog().startTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + + assertThat(base).isSameAs(((BaseTable) one).operations().refresh()); + assertThat(base.currentSnapshot()).isNull(); + + catalogTransaction.commitTransaction(); + + TableMetadata updated = ((BaseTable) one).operations().refresh(); + assertThat(base).isNotSameAs(updated); + assertThat(base.lastUpdatedMillis()).isLessThan(updated.lastUpdatedMillis()); + + assertThat(updated.currentSnapshot().addedDataFiles(catalog().loadTable(identifier).io())) + .hasSize(2); + } + + @Test + public void txAgainstMultipleTables() { + txAgainstMultipleTables(SNAPSHOT); + } + + @Test + public void txAgainstMultipleTablesWithSerializable() { + txAgainstMultipleTables(SERIALIZABLE); + } + + private void txAgainstMultipleTables(CatalogTransaction.IsolationLevel isolationLevel) { + List tables = Arrays.asList("a", "b", "c"); + for (String tbl : tables) { + catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA); + } + + TableIdentifier first = TableIdentifier.of("ns", "a"); + TableIdentifier second = TableIdentifier.of("ns", "b"); + TableIdentifier third = TableIdentifier.of("ns", "c"); + Table one = catalog().loadTable(first); + Table two = catalog().loadTable(second); + Table three = catalog().loadTable(third); + + TableMetadata baseMetadataOne = ((BaseTable) one).operations().current(); + TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current(); + TableMetadata baseMetadataThree = ((BaseTable) three).operations().current(); + + CatalogTransaction catalogTransaction = catalog().startTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + + txCatalog.loadTable(first).newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + + txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit(); + txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit(); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + + txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit(); + txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit(); + + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh()); + + for (String tbl : tables) { + TableMetadata current = + ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh(); + assertThat(current.snapshots()).isEmpty(); + } + + catalogTransaction.commitTransaction(); + + for (String tbl : tables) { + TableMetadata current = + ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh(); + assertThat(current.snapshots()).hasSizeGreaterThanOrEqualTo(1); + } + + one = catalog().loadTable(first); + two = catalog().loadTable(second); + three = catalog().loadTable(third); + assertThat(one.currentSnapshot().allManifests(one.io())).hasSize(1); + assertThat(two.currentSnapshot().allManifests(two.io())).hasSize(1); + assertThat(three.currentSnapshot().allManifests(three.io())).hasSize(1); + + assertThat(one.currentSnapshot().addedDataFiles(one.io())).hasSize(2); + assertThat(two.currentSnapshot().addedDataFiles(two.io())).hasSize(2); + assertThat(three.currentSnapshot().addedDataFiles(three.io())).hasSize(1); + } + + @Test + public void txAgainstMultipleTablesLastOneSchemaConflict() { + txAgainstMultipleTablesLastOneSchemaConflict(SNAPSHOT); + } + + @Test + public void txAgainstMultipleTablesLastOneSchemaConflictWithSerializable() { + txAgainstMultipleTablesLastOneSchemaConflict(SERIALIZABLE); + } + + private void txAgainstMultipleTablesLastOneSchemaConflict( + CatalogTransaction.IsolationLevel isolationLevel) { + for (String tbl : Arrays.asList("a", "b", "c")) { + catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA); + } + + TableIdentifier first = TableIdentifier.of("ns", "a"); + TableIdentifier second = TableIdentifier.of("ns", "b"); + TableIdentifier third = TableIdentifier.of("ns", "c"); + Table one = catalog().loadTable(first); + Table two = catalog().loadTable(second); + Table three = catalog().loadTable(third); + + TableMetadata baseMetadataOne = ((BaseTable) one).operations().current(); + TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current(); + TableMetadata baseMetadataThree = ((BaseTable) three).operations().current(); + + CatalogTransaction catalogTransaction = catalog().startTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + + txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit(); + txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit(); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + + txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit(); + txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit(); + + txCatalog.loadTable(third).updateSchema().renameColumn("data", "new-column").commit(); + + assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh()); + + // delete the colum we're trying to rename in the catalog TX + three.updateSchema().deleteColumn("data").commit(); + + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh()); + assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull(); + + if (SERIALIZABLE == isolationLevel) { + Assertions.assertThatThrownBy(catalogTransaction::commitTransaction) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read"); + } else { + String failureMsg = "Requirement failed: current schema changed: expected id 0 != 1"; + if (catalog() instanceof JdbcCatalog) { + failureMsg = "Table metadata refresh is required"; + } + + Assertions.assertThatThrownBy(catalogTransaction::commitTransaction) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining(failureMsg); + } + + if (catalog() instanceof JdbcCatalog) { + // FIXME: partial updates are being applied due to how SQLite manages nested transactions and + // how isolation is determined among connections and transactions + // that means that we might not want to do a POC for JdbcCatalogTransaction + } else { + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh()); + assertThat(((BaseTable) three).operations().refresh().schema().findField("new-column")) + .isNull(); + assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull(); + assertThat(((BaseTable) three).operations().refresh().schema().columns()).hasSize(1); + } + } + + @Test + public void txAgainstMultipleTablesLastOneFails() { + txAgainstMultipleTablesLastOneFails(SNAPSHOT); + } + + @Test + public void txAgainstMultipleTablesLastOneFailsWithSerializable() { + txAgainstMultipleTablesLastOneFails(SERIALIZABLE); + } + + private void txAgainstMultipleTablesLastOneFails( + CatalogTransaction.IsolationLevel isolationLevel) { + for (String tbl : Arrays.asList("a", "b", "c")) { + catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA); + } + + TableIdentifier first = TableIdentifier.of("ns", "a"); + TableIdentifier second = TableIdentifier.of("ns", "b"); + TableIdentifier third = TableIdentifier.of("ns", "c"); + Table one = catalog().loadTable(first); + Table two = catalog().loadTable(second); + Table three = catalog().loadTable(third); + + TableMetadata baseMetadataOne = ((BaseTable) one).operations().current(); + TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current(); + TableMetadata baseMetadataThree = ((BaseTable) three).operations().current(); + + CatalogTransaction catalogTransaction = catalog().startTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + + txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit(); + txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit(); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + + txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit(); + txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit(); + + assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh()); + + // perform updates outside the catalog TX + three.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit(); + Snapshot snapshot = ((BaseTable) three).operations().refresh().currentSnapshot(); + assertThat(snapshot).isNotNull(); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2"); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2"); + + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh()); + + if (SERIALIZABLE == isolationLevel) { + Assertions.assertThatThrownBy(catalogTransaction::commitTransaction) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read"); + } else { + if (catalog() instanceof JdbcCatalog) { + // FIXME: why does this not fail for SQLite? + catalogTransaction.commitTransaction(); + } else { + Assertions.assertThatThrownBy(catalogTransaction::commitTransaction) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Requirement failed: branch main was created concurrently"); + } + } + + if (catalog() instanceof JdbcCatalog) { + // FIXME: why does the commit not fail with SQLite? + } else { + // the third update in the catalog TX fails, so we need to make sure that all changes from the + // catalog TX are rolled back + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh()); + + assertThat(((BaseTable) one).operations().refresh().currentSnapshot()).isNull(); + assertThat(((BaseTable) two).operations().refresh().currentSnapshot()).isNull(); + assertThat(((BaseTable) three).operations().refresh().currentSnapshot()).isEqualTo(snapshot); + } + } + + @Test + public void schemaUpdateVisibility() { + schemaUpdateVisibility(CatalogTransaction.IsolationLevel.SNAPSHOT); + } + + @Test + public void schemaUpdateVisibilityWithSerializable() { + schemaUpdateVisibility(SERIALIZABLE); + } + + private void schemaUpdateVisibility(CatalogTransaction.IsolationLevel isolationLevel) { + Namespace namespace = Namespace.of("test"); + TableIdentifier identifier = TableIdentifier.of(namespace, "table"); + + catalog().createNamespace(namespace); + catalog().createTable(identifier, SCHEMA); + assertThat(catalog().tableExists(identifier)).isTrue(); + + CatalogTransaction catalogTx = catalog().startTransaction(isolationLevel); + Catalog txCatalog = catalogTx.asCatalog(); + + String column = "new_col"; + + assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNull(); + txCatalog + .loadTable(identifier) + .updateSchema() + .addColumn(column, Types.BooleanType.get()) + .commit(); + + // changes inside the catalog TX should be visible + assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNotNull(); + + // changes outside the catalog TX should not be visible + assertThat(catalog().loadTable(identifier).schema().findField(column)).isNull(); + + catalogTx.commitTransaction(); + + assertThat(catalog().loadTable(identifier).schema().findField(column)).isNotNull(); + assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNotNull(); + } + + @Test + public void readTableAfterLoadTableInsideTx() { + readTableAfterLoadTableInsideTx(SNAPSHOT); + } + + @Test + public void readTableAfterLoadTableInsideTxWithSerializable() { + readTableAfterLoadTableInsideTx(SERIALIZABLE); + } + + private void readTableAfterLoadTableInsideTx(CatalogTransaction.IsolationLevel isolationLevel) { + for (String tbl : Arrays.asList("a", "b")) { + catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA); + } + + TableIdentifier first = TableIdentifier.of("ns", "a"); + TableIdentifier second = TableIdentifier.of("ns", "b"); + Table two = catalog().loadTable(second); + + CatalogTransaction catalogTransaction = catalog().startTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_D).commit(); + assertThat(Iterables.size(txCatalog.loadTable(first).newScan().planFiles())).isEqualTo(2); + assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0); + + two.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit(); + + // this should not be allowed with SERIALIZABLE after the table has been already read + // within the catalog TX, but is allowed with SNAPSHOT + assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(3); + + if (SERIALIZABLE == isolationLevel) { + Assertions.assertThatThrownBy(catalogTransaction::commitTransaction) + .isInstanceOf(ValidationException.class) + .hasMessage( + "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.b' after it was read"); + + assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(0); + assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3); + } else { + catalogTransaction.commitTransaction(); + + assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(2); + assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3); + } + } + + @Test + public void concurrentTx() { + concurrentTx(SNAPSHOT); + } + + @Test + public void concurrentTxWithSerializable() { + concurrentTx(SERIALIZABLE); + } + + private void concurrentTx(CatalogTransaction.IsolationLevel isolationLevel) { + TableIdentifier identifier = TableIdentifier.of("ns", "tbl"); + catalog().createTable(identifier, SCHEMA); + Table one = catalog().loadTable(identifier); + + CatalogTransaction catalogTransaction = catalog().startTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + + // perform updates outside catalog TX but before table has been read inside the catalog TX + one.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit(); + + Snapshot snapshot = ((BaseTable) one).operations().refresh().currentSnapshot(); + assertThat(snapshot).isNotNull(); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2"); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2"); + assertThat(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)).isEqualTo("2"); + + // this should not fail with any isolation level + txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + + catalogTransaction.commitTransaction(); + + snapshot = ((BaseTable) one).operations().refresh().currentSnapshot(); + assertThat(snapshot).isNotNull(); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2"); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2"); + assertThat(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)).isEqualTo("4"); + } +} diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalogTransaction.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalogTransaction.java new file mode 100644 index 000000000000..bf9a3bba8045 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalogTransaction.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.jdbc; + +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogTransactionTests; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +public class TestJdbcCatalogTransaction extends CatalogTransactionTests { + private JdbcCatalog catalog; + + @Override + protected JdbcCatalog catalog() { + return catalog; + } + + @BeforeEach + public void before() { + catalog = new JdbcCatalog(); + + String sqliteDb = + "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""); + catalog.setConf(new Configuration()); + catalog.initialize( + "jdbc-catalog", + ImmutableMap.of( + CatalogProperties.WAREHOUSE_LOCATION, + metadataDir.toFile().getAbsolutePath(), + CatalogProperties.URI, + sqliteDb, + JdbcCatalog.PROPERTY_PREFIX + "username", + "user", + JdbcCatalog.PROPERTY_PREFIX + "password", + "password")); + } + + @AfterEach + public void after() { + if (null != catalog) { + catalog.close(); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java index c6d41818441c..ae4667ccf417 100644 --- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java +++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java @@ -40,6 +40,7 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.requests.CommitTxRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; @@ -130,7 +131,8 @@ enum Route { HTTPMethod.POST, "v1/namespaces/{namespace}/tables/{table}/metrics", ReportMetricsRequest.class, - null); + null), + COMMIT_TX(HTTPMethod.POST, "v1/commitTransaction", CommitTxRequest.class, null); private final HTTPMethod method; private final int requiredLength; @@ -357,6 +359,13 @@ public T handleRequest( return null; } + case COMMIT_TX: + { + CommitTxRequest commitTxRequest = castRequest(CommitTxRequest.class, body); + CatalogHandlers.commitTransaction(catalog, commitTxRequest); + return null; + } + default: } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalogTransaction.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalogTransaction.java new file mode 100644 index 000000000000..5ed82ff2e90d --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalogTransaction.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogTransactionTests; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +public class TestRESTCatalogTransaction extends CatalogTransactionTests { + private RESTCatalog catalog; + private JdbcCatalog jdbcCatalog; + private Server httpServer; + + @Override + protected RESTCatalog catalog() { + return catalog; + } + + @BeforeEach + public void before() throws Exception { + jdbcCatalog = new JdbcCatalog(); + + String sqliteDb = + "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""); + jdbcCatalog.setConf(new Configuration()); + jdbcCatalog.initialize( + "jdbc-catalog", + ImmutableMap.of( + CatalogProperties.WAREHOUSE_LOCATION, + metadataDir.toFile().getAbsolutePath(), + CatalogProperties.URI, + sqliteDb, + JdbcCatalog.PROPERTY_PREFIX + "username", + "user", + JdbcCatalog.PROPERTY_PREFIX + "password", + "password")); + + RESTCatalogAdapter adaptor = new RESTCatalogAdapter(jdbcCatalog); + + RESTCatalogServlet servlet = new RESTCatalogServlet(adaptor); + ServletContextHandler servletContext = + new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + servletContext.setContextPath("/"); + ServletHolder servletHolder = new ServletHolder(servlet); + servletHolder.setInitParameter("javax.ws.rs.Application", "ServiceListPublic"); + servletContext.addServlet(servletHolder, "/*"); + servletContext.setVirtualHosts(null); + servletContext.setGzipHandler(new GzipHandler()); + + this.httpServer = new Server(0); + httpServer.setHandler(servletContext); + httpServer.start(); + + SessionCatalog.SessionContext context = + new SessionCatalog.SessionContext( + UUID.randomUUID().toString(), + "user", + ImmutableMap.of("credential", "user:12345"), + ImmutableMap.of()); + + this.catalog = + new RESTCatalog( + context, + (config) -> HTTPClient.builder().uri(config.get(CatalogProperties.URI)).build()); + catalog.setConf(new Configuration()); + catalog.initialize( + "prod", + ImmutableMap.of( + CatalogProperties.URI, httpServer.getURI().toString(), "credential", "catalog:12345")); + } + + @AfterEach + public void closeCatalog() throws Exception { + if (null != catalog) { + catalog.close(); + } + + if (null != jdbcCatalog) { + jdbcCatalog.close(); + } + + if (null != httpServer) { + httpServer.stop(); + httpServer.join(); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/requests/TestCommitTxRequestParser.java b/core/src/test/java/org/apache/iceberg/rest/requests/TestCommitTxRequestParser.java new file mode 100644 index 000000000000..17980e724d5c --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/requests/TestCommitTxRequestParser.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.requests; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.UpdateTableRequest.UpdateRequirement; +import org.assertj.core.api.Assertions; +import org.junit.Test; + +public class TestCommitTxRequestParser { + + @Test + public void nullAndEmptyCheck() { + Assertions.assertThatThrownBy(() -> CommitTxRequestParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid commit tx request: null"); + + Assertions.assertThatThrownBy(() -> CommitTxRequestParser.fromJson((JsonNode) null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse commit tx request from null object"); + } + + @Test + public void missingFields() {} + + @Test + public void invalidTableIdentifier() {} + + @Test + public void invalidRequirements() {} + + @Test + public void invalidMetadataUpdates() {} + + @Test + public void roundTripSerde() { + String uuid = "2cc52516-5e73-41f2-b139-545d41a4e151"; + CommitTxRequest.CommitTableRequest commitTableRequestOne = + ImmutableCommitTableRequest.builder() + .identifier(TableIdentifier.of("ns1", "table1")) + .addRequirements(new UpdateRequirement.AssertTableUUID(uuid)) + .addRequirements(new UpdateRequirement.AssertTableDoesNotExist()) + .addUpdates(new MetadataUpdate.AssignUUID(uuid)) + .addUpdates(new MetadataUpdate.SetCurrentSchema(23)) + .build(); + + CommitTxRequest.CommitTableRequest commitTableRequestTwo = + ImmutableCommitTableRequest.builder() + .identifier(TableIdentifier.of("ns1", "table2")) + .addRequirements(new UpdateRequirement.AssertDefaultSpecID(4)) + .addRequirements(new UpdateRequirement.AssertCurrentSchemaID(24)) + .addUpdates(new MetadataUpdate.RemoveSnapshot(101L)) + .addUpdates(new MetadataUpdate.SetCurrentSchema(25)) + .build(); + + CommitTxRequest commitTxRequest = + ImmutableCommitTxRequest.builder() + .addTableChanges(commitTableRequestOne, commitTableRequestTwo) + .build(); + + String expectedJson = + "{\n" + + " \"ns1.table1\" : {\n" + + " \"identifier\" : {\n" + + " \"namespace\" : [ \"ns1\" ],\n" + + " \"name\" : \"table1\"\n" + + " },\n" + + " \"requirements\" : [ {\n" + + " \"type\" : \"assert-table-uuid\",\n" + + " \"uuid\" : \"2cc52516-5e73-41f2-b139-545d41a4e151\"\n" + + " }, {\n" + + " \"type\" : \"assert-create\"\n" + + " } ],\n" + + " \"updates\" : [ {\n" + + " \"action\" : \"assign-uuid\",\n" + + " \"uuid\" : \"2cc52516-5e73-41f2-b139-545d41a4e151\"\n" + + " }, {\n" + + " \"action\" : \"set-current-schema\",\n" + + " \"schema-id\" : 23\n" + + " } ]\n" + + " },\n" + + " \"ns1.table2\" : {\n" + + " \"identifier\" : {\n" + + " \"namespace\" : [ \"ns1\" ],\n" + + " \"name\" : \"table2\"\n" + + " },\n" + + " \"requirements\" : [ {\n" + + " \"type\" : \"assert-default-spec-id\",\n" + + " \"default-spec-id\" : 4\n" + + " }, {\n" + + " \"type\" : \"assert-current-schema-id\",\n" + + " \"current-schema-id\" : 24\n" + + " } ],\n" + + " \"updates\" : [ {\n" + + " \"action\" : \"remove-snapshots\",\n" + + " \"snapshot-ids\" : [ 101 ]\n" + + " }, {\n" + + " \"action\" : \"set-current-schema\",\n" + + " \"schema-id\" : 25\n" + + " } ]\n" + + " }\n" + + "}"; + + String json = CommitTxRequestParser.toJson(commitTxRequest, true); + Assertions.assertThat(json).isEqualTo(expectedJson); + + // we can't do an equality comparison on CommitTxRequest because updates/requirements don't + // implement equals/hashcode + Assertions.assertThat(CommitTxRequestParser.toJson(CommitTxRequestParser.fromJson(json), true)) + .isEqualTo(expectedJson); + } +}