diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java index 322c48bc9ab0..e9dd109d3ad4 100644 --- a/core/src/main/java/org/apache/iceberg/BaseScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseScan.java @@ -106,7 +106,7 @@ protected Schema tableSchema() { return schema; } - protected TableScanContext context() { + public TableScanContext context() { return context; } diff --git a/core/src/main/java/org/apache/iceberg/catalog/BaseCatalogTransaction.java b/core/src/main/java/org/apache/iceberg/catalog/BaseCatalogTransaction.java new file mode 100644 index 000000000000..742c3bbb82e1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/catalog/BaseCatalogTransaction.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.catalog; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.BaseTransaction; +import org.apache.iceberg.DataTableScan; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.ReplaceSortOrder; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.RewriteManifests; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.Transactions; +import org.apache.iceberg.UpdateLocation; +import org.apache.iceberg.UpdatePartitionSpec; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.UpdateStatistics; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.rest.RESTCatalog; +import org.immutables.value.Value; + +public class BaseCatalogTransaction implements CatalogTransaction { + private final Map txByTable; + private final Map initiallyReadTableMetadataByRef; + private final Map initiallyReadTables; + private final IsolationLevel isolationLevel; + private final Catalog origin; + private boolean hasCommitted = false; + + public BaseCatalogTransaction(Catalog origin, IsolationLevel isolationLevel) { + Preconditions.checkArgument(null != origin, "Invalid origin catalog: null"); + Preconditions.checkArgument(null != isolationLevel, "Invalid isolation level: null"); + Preconditions.checkArgument( + origin instanceof SupportsCatalogTransactions, + "Origin catalog does not support catalog transactions"); + this.origin = origin; + this.isolationLevel = isolationLevel; + this.txByTable = Maps.newHashMap(); + this.initiallyReadTableMetadataByRef = Maps.newHashMap(); + this.initiallyReadTables = Maps.newHashMap(); + } + + @Override + public void commitTransaction() { + Preconditions.checkState(!hasCommitted, "Transaction has already committed changes"); + + try { + // write skew is not possible in read-only transactions, so only perform that check if there + // were any pending updates + if (hasUpdates()) { + validateSerializableIsolation(); + } + + List tableCommits = + txByTable.entrySet().stream() + .filter(e -> e.getValue() instanceof BaseTransaction) + .map( + e -> + TableCommit.create( + e.getKey(), + ((BaseTransaction) e.getValue()).startMetadata(), + ((BaseTransaction) e.getValue()).currentMetadata())) + .collect(Collectors.toList()); + + // TODO: should this be retryable? + // only commit if there were change + if (!tableCommits.isEmpty()) { + // TODO: remove this cast once commitTransaction(..) is defined at the Catalog level + ((RESTCatalog) origin).commitTransaction(tableCommits); + } + + // TODO: we should probably be refreshing metadata from all affected tables after the TX + // committed (alternatively to this approach we could load each affected table from the origin + // catalog + // txByTable.values().stream() + // .filter(tx -> tx instanceof BaseTransaction) + // .map(tx -> (BaseTransaction) tx) + // .forEach(tx -> tx.underlyingOps().refresh()); + + hasCommitted = true; + } catch (CommitStateUnknownException e) { + throw e; + } catch (RuntimeException e) { + rollback(); + throw e; + } + } + + private boolean hasUpdates() { + return txByTable.values().stream() + .filter(tx -> tx instanceof BaseTransaction) + .map(tx -> (BaseTransaction) tx) + .anyMatch(tx -> !tx.currentMetadata().changes().isEmpty()); + } + + /** + * With SERIALIZABLE isolation we mainly need to check that write skew is not possible. Write skew + * happens due to a transaction taking action based on an outdated premise (a fact that was true + * when a table was initially loaded but then changed due to a concurrent update to the table + * while this TX was in-progress). When this TX wants to commit, the original premise might not + * hold anymore, thus we need to check whether the {@link org.apache.iceberg.Snapshot} a branch + * was pointing to changed after it was initially read inside this TX. If no information of a + * branch's snapshot is available, we check whether {@link TableMetadata} changed after it was + * initially read. + */ + private void validateSerializableIsolation() { + for (TableRef readTable : initiallyReadTableMetadataByRef.keySet()) { + // check all read tables to determine whether they changed outside the catalog + // TX after they were initially read on a particular branch + if (IsolationLevel.SERIALIZABLE == isolationLevel) { + BaseTable table = (BaseTable) origin.loadTable(readTable.identifier()); + SnapshotRef snapshotRef = table.operations().current().ref(readTable.ref()); + SnapshotRef snapshotRefInsideTx = + initiallyReadTableMetadataByRef.get(readTable).ref(readTable.ref()); + + if (null != snapshotRef + && null != snapshotRefInsideTx + && snapshotRef.snapshotId() != snapshotRefInsideTx.snapshotId()) { + throw new ValidationException( + "%s isolation violation: Found table metadata updates to table '%s' after it was read on branch '%s'", + isolationLevel(), readTable.identifier().toString(), readTable.ref()); + } + + if (null == snapshotRef || null == snapshotRefInsideTx) { + TableMetadata currentTableMetadata = table.operations().current(); + + if (!currentTableMetadata + .metadataFileLocation() + .equals(initiallyReadTableMetadataByRef.get(readTable).metadataFileLocation())) { + throw new ValidationException( + "%s isolation violation: Found table metadata updates to table '%s' after it was read", + isolationLevel(), readTable.identifier()); + } + } + } + } + } + + private void rollback() { + txByTable.clear(); + initiallyReadTableMetadataByRef.clear(); + initiallyReadTables.clear(); + } + + @Override + public Catalog asCatalog() { + return new AsTransactionalCatalog(); + } + + @Override + public IsolationLevel isolationLevel() { + return isolationLevel; + } + + private Optional txTable(TableIdentifier identifier) { + if (txByTable.containsKey(identifier)) { + return Optional.ofNullable(txByTable.get(identifier).table()); + } + + return Optional.empty(); + } + + /** We need to keep track of the tables that we read inside the TX to prevent read skew */ + private Optional
initiallyReadTable(TableIdentifier identifier) { + if (initiallyReadTables.containsKey(identifier)) { + return Optional.ofNullable(initiallyReadTables.get(identifier)); + } + + return Optional.empty(); + } + + /** + * We're using a {@link Transaction} per table so that we can keep track of pending changes for a + * particular table. + */ + private Transaction txForTable(Table table) { + return txByTable.computeIfAbsent( + identifierWithoutCatalog(table.name()), + k -> Transactions.newTransaction(table.name(), ((HasTableOperations) table).operations())); + } + + // TODO: this functionality should probably live somewhere else to be reusable + private TableIdentifier identifierWithoutCatalog(String tableWithCatalog) { + if (tableWithCatalog.startsWith(origin.name())) { + return TableIdentifier.parse(tableWithCatalog.replace(origin.name() + ".", "")); + } + return TableIdentifier.parse(tableWithCatalog); + } + + public class AsTransactionalCatalog implements Catalog { + @Override + public Table loadTable(TableIdentifier identifier) { + Table table = + BaseCatalogTransaction.this + .txTable(identifier) + .orElseGet( + () -> + initiallyReadTable(identifier) + .orElseGet( + () -> { + Table loadTable = origin.loadTable(identifier); + + initiallyReadTables.computeIfAbsent(identifier, ident -> loadTable); + + // remember the very first version of table metadata that was read + if (IsolationLevel.SERIALIZABLE == isolationLevel()) { + initiallyReadTableMetadataByRef.computeIfAbsent( + ImmutableTableRef.builder() + .identifier(identifier) + .ref(SnapshotRef.MAIN_BRANCH) + .build(), + ident -> opsFromTable(loadTable).current()); + } + + return loadTable; + })); + + return new TransactionalTable(table, opsFromTable(table)); + } + + @Override + public List listTables(Namespace namespace) { + return origin.listTables(namespace); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + return origin.dropTable(identifier, purge); + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + origin.renameTable(from, to); + } + } + + private static TableOperations opsFromTable(Table table) { + return table instanceof BaseTransaction.TransactionTable + ? ((BaseTransaction.TransactionTable) table).operations() + : ((BaseTable) table).operations(); + } + + private class TransactionalTable extends BaseTable { + private final Table table; + + private TransactionalTable(Table table, TableOperations ops) { + super(ops, table.name()); + this.table = table; + } + + @Override + public TableScan newScan() { + TableScan tableScan = super.newScan(); + if (tableScan instanceof DataTableScan) { + return new TransactionalTableScan((DataTableScan) tableScan); + } + + return tableScan; + } + + @Override + public UpdateSchema updateSchema() { + return txForTable(table).updateSchema(); + } + + @Override + public UpdatePartitionSpec updateSpec() { + return txForTable(table).updateSpec(); + } + + @Override + public UpdateProperties updateProperties() { + return txForTable(table).updateProperties(); + } + + @Override + public ReplaceSortOrder replaceSortOrder() { + return txForTable(table).replaceSortOrder(); + } + + @Override + public UpdateLocation updateLocation() { + return txForTable(table).updateLocation(); + } + + @Override + public AppendFiles newAppend() { + return txForTable(table).newAppend(); + } + + @Override + public AppendFiles newFastAppend() { + return txForTable(table).newFastAppend(); + } + + @Override + public RewriteFiles newRewrite() { + return txForTable(table).newRewrite(); + } + + @Override + public RewriteManifests rewriteManifests() { + return txForTable(table).rewriteManifests(); + } + + @Override + public OverwriteFiles newOverwrite() { + return txForTable(table).newOverwrite(); + } + + @Override + public RowDelta newRowDelta() { + return txForTable(table).newRowDelta(); + } + + @Override + public ReplacePartitions newReplacePartitions() { + return txForTable(table).newReplacePartitions(); + } + + @Override + public DeleteFiles newDelete() { + return txForTable(table).newDelete(); + } + + @Override + public UpdateStatistics updateStatistics() { + return txForTable(table).updateStatistics(); + } + + @Override + public ExpireSnapshots expireSnapshots() { + return txForTable(table).expireSnapshots(); + } + + @Override + public ManageSnapshots manageSnapshots() { + return txForTable(table).manageSnapshots(); + } + } + + private class TransactionalTableScan extends DataTableScan { + protected TransactionalTableScan(DataTableScan delegate) { + super(delegate.table(), delegate.schema(), delegate.context()); + } + + @Override + public TableScan useRef(String name) { + DataTableScan tableScan = (DataTableScan) super.useRef(name); + + if (IsolationLevel.SERIALIZABLE == isolationLevel()) { + // store which version of the table on the given branch we read the first time + initiallyReadTableMetadataByRef.computeIfAbsent( + ImmutableTableRef.builder() + .identifier(identifierWithoutCatalog(table().name())) + .ref(name) + .build(), + ident -> opsFromTable(table()).current()); + } + + return tableScan; + } + } + + @Value.Immutable + interface TableRef { + TableIdentifier identifier(); + + String ref(); + + @Value.Lazy + default String name() { + return identifier().toString() + "@" + ref(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/catalog/CatalogTransaction.java b/core/src/main/java/org/apache/iceberg/catalog/CatalogTransaction.java new file mode 100644 index 000000000000..b909b66461b5 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/catalog/CatalogTransaction.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.catalog; + +public interface CatalogTransaction { + + enum IsolationLevel { + + /** + * All reads that are being made will see the last committed values that existed when the table + * was loaded first inside the catalog transaction. Subsequent changes to a table that happened + * outside the catalog transaction after the table was read will not be seen to prevent read + * skew (reading a table multiple times within the catalog transaction should always return + * the same results).
+ *
+ * Will successfully commit only if the values updated by the transaction do not conflict with + * other concurrent updates.
+ *
+ * + *

Note that under SNAPSHOT isolation a write skew anomaly is acceptable and + * permitted. In a write skew anomaly, two transactions (T1 and T2) concurrently read an + * overlapping data set (e.g. values V1 and V2), concurrently make disjoint updates (e.g. T1 + * updates V1, T2 updates V2), and finally concurrently commit, neither having seen the update + * performed by the other. + */ + SNAPSHOT, + + /** + * All reads that are being made will see the last committed values that existed when the table + * was loaded first inside the catalog transaction. Subsequent changes to a table that happened + * outside the catalog transaction after the table was read will not be seen to prevent read + * skew.
+ *
+ * All tables participating in the transaction must be in the same state when committing + * compared to when the table was loaded first within the catalog transaction.
+ *
+ * + *

Note that a write skew anomaly is not possible under SERIALIZABLE isolation, where + * two transactions (T1 and T2) concurrently read an overlapping data set (e.g. values V1 and + * V2), concurrently make disjoint updates (e.g. T1 updates V1, T2 updates V2). This is because + * under SERIALIZABLE isolation either T1 or T2 would have to occur first and be visible to the + * other transaction. + */ + SERIALIZABLE; + } + + /** + * Performs an atomic commit of all the pending changes across multiple tables. Engine-specific + * implementations must ensure that all pending changes are applied atomically. + */ + void commitTransaction(); + + /** + * Returns this catalog transaction as a {@link Catalog} API so that any actions that are called + * through this API are participating in this catalog transaction. + * + * @return This catalog transaction as a {@link Catalog} API. Any actions that are called through + * this API are participating in this catalog transaction. + */ + Catalog asCatalog(); + + /** + * Returns the current {@link IsolationLevel} for this transaction. + * + * @return The {@link IsolationLevel} for this transaction. + */ + IsolationLevel isolationLevel(); +} diff --git a/core/src/main/java/org/apache/iceberg/catalog/SupportsCatalogTransactions.java b/core/src/main/java/org/apache/iceberg/catalog/SupportsCatalogTransactions.java new file mode 100644 index 000000000000..d87f74d39cb3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/catalog/SupportsCatalogTransactions.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.catalog; + +import org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel; + +public interface SupportsCatalogTransactions { + + /** + * Create a new {@link CatalogTransaction} with the given {@link IsolationLevel}. + * + * @param isolationLevel The isolation level to use. + * @return A new {@link CatalogTransaction}. + */ + CatalogTransaction createTransaction(IsolationLevel isolationLevel); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java index 63b660c46aa3..9efb43a08224 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java @@ -29,9 +29,12 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.BaseCatalogTransaction; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.CatalogTransaction; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.SupportsCatalogTransactions; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; @@ -41,7 +44,12 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable, Closeable { +public class RESTCatalog + implements Catalog, + SupportsNamespaces, + Configurable, + Closeable, + SupportsCatalogTransactions { private final RESTSessionCatalog sessionCatalog; private final Catalog delegate; private final SupportsNamespaces nsDelegate; @@ -253,6 +261,12 @@ public void close() throws IOException { sessionCatalog.close(); } + /** + * This performs an atomic multi-table swap for the given {@link TableCommit} instances. + * + * @param commits The {@link TableCommit} instances containing the changes to be atomically + * applied across multiple tables. + */ public void commitTransaction(List commits) { sessionCatalog.commitTransaction(context, commits); } @@ -261,4 +275,9 @@ public void commitTransaction(TableCommit... commits) { sessionCatalog.commitTransaction( context, ImmutableList.builder().add(commits).build()); } + + @Override + public CatalogTransaction createTransaction(CatalogTransaction.IsolationLevel isolationLevel) { + return new BaseCatalogTransaction(this, isolationLevel); + } } diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java new file mode 100644 index 000000000000..fc4ccbdea756 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java @@ -0,0 +1,762 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.catalog; + +import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE; +import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TestCatalogUtil; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public abstract class CatalogTransactionTests< + C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> { + + @TempDir protected Path metadataDir; + + protected static final Schema SCHEMA = + new Schema( + required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get())); + + // Partition spec used to create tables + protected static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build(); + + protected static final DataFile FILE_A = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") // easy way to set partition data for now + .withRecordCount(1) + .build(); + protected static final DataFile FILE_B = + DataFiles.builder(SPEC) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=1") // easy way to set partition data for now + .withRecordCount(1) + .build(); + protected static final DataFile FILE_C = + DataFiles.builder(SPEC) + .withPath("/path/to/data-c.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=2") // easy way to set partition data for now + .withRecordCount(1) + .build(); + protected static final DataFile FILE_D = + DataFiles.builder(SPEC) + .withPath("/path/to/data-d.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=3") // easy way to set partition data for now + .withRecordCount(1) + .build(); + + protected abstract C catalog(); + + @Test + public void testNulls() { + assertThatThrownBy(() -> new BaseCatalogTransaction(null, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid origin catalog: null"); + + assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid isolation level: null"); + } + + @Test + public void catalogTransactionSupport() { + assertThatThrownBy( + () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Origin catalog does not support catalog transactions"); + } + + @Test + public void multipleCommits() { + CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE); + catalogTx.commitTransaction(); + assertThatThrownBy(catalogTx::commitTransaction) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Transaction has already committed changes"); + } + + @Test + public void invalidIsolationLevel() { + assertThatThrownBy(() -> catalog().createTransaction(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid isolation level: null"); + } + + @Test + public void catalogTxWithSingleOp() { + catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT); + } + + @Test + public void catalogTxWithSingleOpWithSerializable() { + catalogTxWithSingleOp(SERIALIZABLE); + } + + private void catalogTxWithSingleOp(CatalogTransaction.IsolationLevel isolationLevel) { + TableIdentifier identifier = TableIdentifier.of("ns", "tx-with-single-op"); + catalog().createNamespace(identifier.namespace()); + catalog().createTable(identifier, SCHEMA, SPEC); + + Table one = catalog().loadTable(identifier); + TableMetadata base = ((BaseTable) one).operations().current(); + + CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + + assertThat(base).isSameAs(((BaseTable) one).operations().refresh()); + assertThat(base.currentSnapshot()).isNull(); + + catalogTransaction.commitTransaction(); + + TableMetadata updated = ((BaseTable) one).operations().refresh(); + assertThat(base).isNotSameAs(updated); + + Snapshot snapshot = updated.currentSnapshot(); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2"); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2"); + } + + @Test + public void txAgainstMultipleTables() { + txAgainstMultipleTables(SNAPSHOT); + } + + @Test + public void txAgainstMultipleTablesWithSerializable() { + txAgainstMultipleTables(SERIALIZABLE); + } + + private void txAgainstMultipleTables(CatalogTransaction.IsolationLevel isolationLevel) { + catalog().createNamespace(Namespace.of("ns")); + + List tables = Arrays.asList("a", "b", "c"); + for (String tbl : tables) { + catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA); + } + + TableIdentifier first = TableIdentifier.of("ns", "a"); + TableIdentifier second = TableIdentifier.of("ns", "b"); + TableIdentifier third = TableIdentifier.of("ns", "c"); + Table one = catalog().loadTable(first); + Table two = catalog().loadTable(second); + Table three = catalog().loadTable(third); + + TableMetadata baseMetadataOne = ((BaseTable) one).operations().current(); + TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current(); + TableMetadata baseMetadataThree = ((BaseTable) three).operations().current(); + + CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + + txCatalog.loadTable(first).newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + + txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit(); + txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit(); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + + txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit(); + txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit(); + + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh()); + + for (String tbl : tables) { + TableMetadata current = + ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh(); + assertThat(current.snapshots()).isEmpty(); + } + + catalogTransaction.commitTransaction(); + + for (String tbl : tables) { + TableMetadata current = + ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh(); + assertThat(current.snapshots()).hasSizeGreaterThanOrEqualTo(1); + } + + one = catalog().loadTable(first); + two = catalog().loadTable(second); + three = catalog().loadTable(third); + assertThat(one.currentSnapshot().allManifests(one.io())).hasSize(1); + assertThat(two.currentSnapshot().allManifests(two.io())).hasSize(1); + assertThat(three.currentSnapshot().allManifests(three.io())).hasSize(1); + + assertThat(one.currentSnapshot().addedDataFiles(one.io())).hasSize(2); + assertThat(two.currentSnapshot().addedDataFiles(two.io())).hasSize(2); + assertThat(three.currentSnapshot().addedDataFiles(three.io())).hasSize(1); + } + + @Test + public void txAgainstMultipleTablesLastOneSchemaConflict() { + txAgainstMultipleTablesLastOneSchemaConflict(SNAPSHOT); + } + + @Test + public void txAgainstMultipleTablesLastOneSchemaConflictWithSerializable() { + txAgainstMultipleTablesLastOneSchemaConflict(SERIALIZABLE); + } + + private void txAgainstMultipleTablesLastOneSchemaConflict( + CatalogTransaction.IsolationLevel isolationLevel) { + catalog().createNamespace(Namespace.of("ns")); + + for (String tbl : Arrays.asList("a", "b", "c")) { + catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA); + } + + TableIdentifier first = TableIdentifier.of("ns", "a"); + TableIdentifier second = TableIdentifier.of("ns", "b"); + TableIdentifier third = TableIdentifier.of("ns", "c"); + Table one = catalog().loadTable(first); + Table two = catalog().loadTable(second); + Table three = catalog().loadTable(third); + + TableMetadata baseMetadataOne = ((BaseTable) one).operations().current(); + TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current(); + TableMetadata baseMetadataThree = ((BaseTable) three).operations().current(); + + CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + + txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit(); + txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit(); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + + txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit(); + txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit(); + + txCatalog.loadTable(third).updateSchema().renameColumn("data", "new-column").commit(); + + assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh()); + + // delete the colum we're trying to rename in the catalog TX + three.updateSchema().deleteColumn("data").commit(); + + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh()); + assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull(); + + if (SERIALIZABLE == isolationLevel) { + assertThatThrownBy(catalogTransaction::commitTransaction) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read"); + } else { + assertThatThrownBy(catalogTransaction::commitTransaction) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Requirement failed: current schema changed: expected id 0 != 1"); + } + + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh()); + assertThat(((BaseTable) three).operations().refresh().schema().findField("new-column")) + .isNull(); + assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull(); + assertThat(((BaseTable) three).operations().refresh().schema().columns()).hasSize(1); + } + + @Test + public void txAgainstMultipleTablesLastOneFails() { + txAgainstMultipleTablesLastOneFails(SNAPSHOT); + } + + @Test + public void txAgainstMultipleTablesLastOneFailsWithSerializable() { + txAgainstMultipleTablesLastOneFails(SERIALIZABLE); + } + + private void txAgainstMultipleTablesLastOneFails( + CatalogTransaction.IsolationLevel isolationLevel) { + catalog().createNamespace(Namespace.of("ns")); + + for (String tbl : Arrays.asList("a", "b", "c")) { + catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA); + } + + TableIdentifier first = TableIdentifier.of("ns", "a"); + TableIdentifier second = TableIdentifier.of("ns", "b"); + TableIdentifier third = TableIdentifier.of("ns", "c"); + Table one = catalog().loadTable(first); + Table two = catalog().loadTable(second); + Table three = catalog().loadTable(third); + + TableMetadata baseMetadataOne = ((BaseTable) one).operations().current(); + TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current(); + TableMetadata baseMetadataThree = ((BaseTable) three).operations().current(); + + CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + + txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit(); + txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit(); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + + txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit(); + txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit(); + + assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh()); + + // perform updates outside the catalog TX + three.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit(); + Snapshot snapshot = ((BaseTable) three).operations().refresh().currentSnapshot(); + assertThat(snapshot).isNotNull(); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2"); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2"); + + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh()); + + if (SERIALIZABLE == isolationLevel) { + assertThatThrownBy(catalogTransaction::commitTransaction) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read"); + } else { + assertThatThrownBy(catalogTransaction::commitTransaction) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Requirement failed: branch main was created concurrently"); + } + + // the third update in the catalog TX fails, so we need to make sure that all changes from the + // catalog TX are rolled back + assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh()); + assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh()); + assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh()); + + assertThat(((BaseTable) one).operations().refresh().currentSnapshot()).isNull(); + assertThat(((BaseTable) two).operations().refresh().currentSnapshot()).isNull(); + assertThat(((BaseTable) three).operations().refresh().currentSnapshot()).isEqualTo(snapshot); + } + + @Test + public void schemaUpdateVisibility() { + schemaUpdateVisibility(CatalogTransaction.IsolationLevel.SNAPSHOT); + } + + @Test + public void schemaUpdateVisibilityWithSerializable() { + schemaUpdateVisibility(SERIALIZABLE); + } + + private void schemaUpdateVisibility(CatalogTransaction.IsolationLevel isolationLevel) { + Namespace namespace = Namespace.of("test"); + TableIdentifier identifier = TableIdentifier.of(namespace, "table"); + + catalog().createNamespace(namespace); + catalog().createTable(identifier, SCHEMA); + assertThat(catalog().tableExists(identifier)).isTrue(); + + CatalogTransaction catalogTx = catalog().createTransaction(isolationLevel); + Catalog txCatalog = catalogTx.asCatalog(); + + String column = "new_col"; + + assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNull(); + txCatalog + .loadTable(identifier) + .updateSchema() + .addColumn(column, Types.BooleanType.get()) + .commit(); + + // changes inside the catalog TX should be visible + assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNotNull(); + + // changes outside the catalog TX should not be visible + assertThat(catalog().loadTable(identifier).schema().findField(column)).isNull(); + + catalogTx.commitTransaction(); + + assertThat(catalog().loadTable(identifier).schema().findField(column)).isNotNull(); + assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNotNull(); + } + + @Test + public void readTableAfterLoadTableInsideTx() { + readTableAfterLoadTableInsideTx(SNAPSHOT); + } + + @Test + public void readTableAfterLoadTableInsideTxWithSerializable() { + readTableAfterLoadTableInsideTx(SERIALIZABLE); + } + + private void readTableAfterLoadTableInsideTx(CatalogTransaction.IsolationLevel isolationLevel) { + catalog().createNamespace(Namespace.of("ns")); + + for (String tbl : Arrays.asList("a", "b")) { + catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA); + } + + TableIdentifier first = TableIdentifier.of("ns", "a"); + TableIdentifier second = TableIdentifier.of("ns", "b"); + Table two = catalog().loadTable(second); + + CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_D).commit(); + assertThat(Iterables.size(txCatalog.loadTable(first).newScan().planFiles())).isEqualTo(2); + assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0); + + two.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit(); + + // this should not be allowed with SERIALIZABLE after the table has been already read + // within the catalog TX, but is allowed with SNAPSHOT + // catalog TX should still the version of the table it initially read (with 0 files) + assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0); + + if (SERIALIZABLE == isolationLevel) { + assertThatThrownBy(catalogTransaction::commitTransaction) + .isInstanceOf(ValidationException.class) + .hasMessage( + "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.b' after it was read"); + + assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(0); + assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3); + } else { + catalogTransaction.commitTransaction(); + + assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(2); + assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3); + } + } + + @Test + public void concurrentTx() { + concurrentTx(SNAPSHOT); + } + + @Test + public void concurrentTxWithSerializable() { + concurrentTx(SERIALIZABLE); + } + + private void concurrentTx(CatalogTransaction.IsolationLevel isolationLevel) { + TableIdentifier identifier = TableIdentifier.of("ns", "tbl"); + catalog().createNamespace(identifier.namespace()); + catalog().createTable(identifier, SCHEMA); + Table one = catalog().loadTable(identifier); + + CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + + // perform updates outside catalog TX but before table has been read inside the catalog TX + one.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit(); + + Snapshot snapshot = ((BaseTable) one).operations().refresh().currentSnapshot(); + assertThat(snapshot).isNotNull(); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2"); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2"); + assertThat(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)).isEqualTo("2"); + + // this should not fail with any isolation level + txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + + catalogTransaction.commitTransaction(); + + snapshot = ((BaseTable) one).operations().refresh().currentSnapshot(); + assertThat(snapshot).isNotNull(); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2"); + assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2"); + assertThat(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)).isEqualTo("4"); + } + + @Test + public void readOnlyTxWithSerializableShouldNotFail() { + catalog().createNamespace(Namespace.of("ns")); + + for (String tbl : Arrays.asList("a", "b")) { + catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA); + } + + TableIdentifier first = TableIdentifier.of("ns", "a"); + TableIdentifier second = TableIdentifier.of("ns", "b"); + Table one = catalog().loadTable(first); + Table two = catalog().loadTable(second); + + CatalogTransaction catalogTransaction = catalog().createTransaction(SERIALIZABLE); + Catalog txCatalog = catalogTransaction.asCatalog(); + + assertThat(Iterables.size(txCatalog.loadTable(first).newScan().planFiles())).isEqualTo(0); + assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0); + + // changes happen outside the catalog TX + one.newFastAppend().appendFile(FILE_A).appendFile(FILE_D).commit(); + two.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit(); + + // catalog TX should still the version of the table it initially read (with 0 files) + assertThat(Iterables.size(txCatalog.loadTable(first).newScan().planFiles())).isEqualTo(0); + assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0); + + // this ends up being a read-only TX, thus no write skew can happen, and it shouldn't fail + catalogTransaction.commitTransaction(); + + assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(2); + assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3); + } + + @Test + public void readOnlyTxWithSerializableOnBranchShouldNotFail() { + catalog().createNamespace(Namespace.of("ns")); + + String branch = "branch"; + for (String tbl : Arrays.asList("a", "b")) { + Table table = catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA); + table.newFastAppend().appendFile(FILE_A).commit(); + table.manageSnapshots().createBranch(branch, table.currentSnapshot().snapshotId()).commit(); + } + + TableIdentifier first = TableIdentifier.of("ns", "a"); + TableIdentifier second = TableIdentifier.of("ns", "b"); + Table one = catalog().loadTable(first); + Table two = catalog().loadTable(second); + + CatalogTransaction catalogTransaction = catalog().createTransaction(SERIALIZABLE); + Catalog txCatalog = catalogTransaction.asCatalog(); + + assertThat(Iterables.size(txCatalog.loadTable(first).newScan().useRef(branch).planFiles())) + .isEqualTo(1); + assertThat(Iterables.size(txCatalog.loadTable(second).newScan().useRef(branch).planFiles())) + .isEqualTo(1); + + // changes happen outside the catalog TX + one.newFastAppend().appendFile(FILE_A).appendFile(FILE_D).toBranch(branch).commit(); + two.newFastAppend() + .appendFile(FILE_B) + .appendFile(FILE_C) + .appendFile(FILE_D) + .toBranch(branch) + .commit(); + + // catalog TX should still the version of the table it initially read (with 0 files) + assertThat(Iterables.size(txCatalog.loadTable(first).newScan().useRef(branch).planFiles())) + .isEqualTo(1); + assertThat(Iterables.size(txCatalog.loadTable(second).newScan().useRef(branch).planFiles())) + .isEqualTo(1); + + // this ends up being a read-only TX, thus no write skew can happen, and it shouldn't fail + catalogTransaction.commitTransaction(); + + assertThat(Iterables.size(catalog().loadTable(first).newScan().useRef(branch).planFiles())) + .isEqualTo(3); + assertThat(Iterables.size(catalog().loadTable(second).newScan().useRef(branch).planFiles())) + .isEqualTo(4); + } + + @Test + public void concurrentTxOnBranch() { + concurrentTxOnBranch(SNAPSHOT); + } + + @Test + public void concurrentTxOnBranchWithSerializable() { + concurrentTxOnBranch(SERIALIZABLE); + } + + private void concurrentTxOnBranch(CatalogTransaction.IsolationLevel isolationLevel) { + String branch = "branch"; + TableIdentifier identifier = TableIdentifier.of("ns", "tbl"); + + catalog().createNamespace(identifier.namespace()); + + Table one = catalog().createTable(identifier, SCHEMA); + one.newFastAppend().appendFile(FILE_A).commit(); + one.manageSnapshots().createBranch(branch, one.currentSnapshot().snapshotId()).commit(); + + CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + + // perform updates outside catalog TX but before table has been read inside the catalog TX + one.newAppend().appendFile(FILE_C).appendFile(FILE_D).toBranch(branch).commit(); + + TableMetadata metadata = ((BaseTable) one).operations().refresh(); + Snapshot snapshotOnBranch = metadata.snapshot(metadata.ref(branch).snapshotId()); + assertThat(snapshotOnBranch).isNotNull(); + assertThat(snapshotOnBranch.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2"); + assertThat(snapshotOnBranch.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2"); + assertThat(snapshotOnBranch.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)) + .isEqualTo("3"); + + // this should not fail with any isolation level + txCatalog + .loadTable(identifier) + .newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .toBranch(branch) + .commit(); + + catalogTransaction.commitTransaction(); + + metadata = ((BaseTable) one).operations().refresh(); + snapshotOnBranch = metadata.snapshot(metadata.ref(branch).snapshotId()); + assertThat(snapshotOnBranch).isNotNull(); + assertThat(snapshotOnBranch.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2"); + assertThat(snapshotOnBranch.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2"); + assertThat(snapshotOnBranch.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)) + .isEqualTo("5"); + } + + @Test + public void readTableAfterLoadTableInsideTxOnBranch() { + readTableAfterLoadTableInsideTxOnBranch(SNAPSHOT); + } + + @Test + public void readTableAfterLoadTableInsideTxOnBranchWithSerializable() { + readTableAfterLoadTableInsideTxOnBranch(SERIALIZABLE); + } + + private void readTableAfterLoadTableInsideTxOnBranch( + CatalogTransaction.IsolationLevel isolationLevel) { + catalog().createNamespace(Namespace.of("ns")); + String branch = "branch"; + for (String tbl : Arrays.asList("a", "b")) { + Table table = catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA); + table.newFastAppend().appendFile(FILE_A).commit(); + table.manageSnapshots().createBranch(branch, table.currentSnapshot().snapshotId()).commit(); + } + + TableIdentifier first = TableIdentifier.of("ns", "a"); + TableIdentifier second = TableIdentifier.of("ns", "b"); + Table two = catalog().loadTable(second); + + CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + + txCatalog.loadTable(first).newAppend().appendFile(FILE_D).toBranch(branch).commit(); + assertThat(Iterables.size(txCatalog.loadTable(first).newScan().useRef(branch).planFiles())) + .isEqualTo(2); + assertThat(Iterables.size(txCatalog.loadTable(second).newScan().useRef(branch).planFiles())) + .isEqualTo(1); + + two.newFastAppend().appendFile(FILE_B).toBranch(branch).commit(); + + // this should not be allowed with SERIALIZABLE after the table has been already read + // within the catalog TX, but is allowed with SNAPSHOT + // catalog TX should still the version of the table it initially read (with 1 file) + assertThat(Iterables.size(txCatalog.loadTable(second).newScan().useRef(branch).planFiles())) + .isEqualTo(1); + + if (SERIALIZABLE == isolationLevel) { + assertThatThrownBy(catalogTransaction::commitTransaction) + .isInstanceOf(ValidationException.class) + .hasMessage( + "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.a' after it was read on branch 'branch'"); + + assertThat(Iterables.size(catalog().loadTable(first).newScan().useRef(branch).planFiles())) + .isEqualTo(1); + assertThat(Iterables.size(catalog().loadTable(second).newScan().useRef(branch).planFiles())) + .isEqualTo(2); + } else { + catalogTransaction.commitTransaction(); + + assertThat(Iterables.size(catalog().loadTable(first).newScan().useRef(branch).planFiles())) + .isEqualTo(2); + assertThat(Iterables.size(catalog().loadTable(second).newScan().useRef(branch).planFiles())) + .isEqualTo(2); + } + } + + @Test + public void txAgainstDifferentBranches() { + txAgainstDifferentBranchesWithSerializable(SNAPSHOT); + } + + @Test + public void txAgainstDifferentBranchesWithSerializable() { + txAgainstDifferentBranchesWithSerializable(SERIALIZABLE); + } + + private void txAgainstDifferentBranchesWithSerializable( + CatalogTransaction.IsolationLevel isolationLevel) { + TableIdentifier identifier = TableIdentifier.of("ns", "table"); + catalog().createNamespace(identifier.namespace()); + // TODO: use format-version 1 here due to an issue with REST + concurrent table replace + // => Cannot add snapshot with sequence number 2 older than last sequence number 2 + catalog() + .createTable( + identifier, + SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.of("format-version", "1")); + String branchA = "branchA"; + String branchB = "branchB"; + + Table table = catalog().loadTable(identifier); + table.newFastAppend().appendFile(FILE_A).commit(); + table.manageSnapshots().createBranch(branchA, table.currentSnapshot().snapshotId()).commit(); + + CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel); + Catalog txCatalog = catalogTransaction.asCatalog(); + txCatalog + .loadTable(identifier) + .manageSnapshots() + .createBranch(branchB, table.currentSnapshot().snapshotId()) + .commit(); + txCatalog.loadTable(identifier).newFastAppend().appendFile(FILE_D).toBranch(branchB).commit(); + + table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).toBranch(branchA).commit(); + + catalogTransaction.commitTransaction(); + + assertThat( + Iterables.size(catalog().loadTable(identifier).newScan().useRef(branchA).planFiles())) + .isEqualTo(3); + + assertThat( + Iterables.size(catalog().loadTable(identifier).newScan().useRef(branchB).planFiles())) + .isEqualTo(2); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalogTransaction.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalogTransaction.java new file mode 100644 index 000000000000..ecbbf2b6de0f --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalogTransaction.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.CatalogTransactionTests; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +public class TestRESTCatalogTransaction extends CatalogTransactionTests { + private RESTCatalog catalog; + private InMemoryCatalog backendCatalog; + private Server httpServer; + + @Override + protected RESTCatalog catalog() { + return catalog; + } + + @BeforeEach + public void before() throws Exception { + this.backendCatalog = new InMemoryCatalog(); + this.backendCatalog.initialize( + "in-memory", + ImmutableMap.of( + CatalogProperties.WAREHOUSE_LOCATION, metadataDir.toFile().getAbsolutePath())); + + RESTCatalogAdapter adaptor = new RESTCatalogAdapter(backendCatalog); + + RESTCatalogServlet servlet = new RESTCatalogServlet(adaptor); + ServletContextHandler servletContext = + new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + servletContext.setContextPath("/"); + ServletHolder servletHolder = new ServletHolder(servlet); + servletHolder.setInitParameter("javax.ws.rs.Application", "ServiceListPublic"); + servletContext.addServlet(servletHolder, "/*"); + servletContext.setVirtualHosts(null); + servletContext.setGzipHandler(new GzipHandler()); + + this.httpServer = new Server(0); + httpServer.setHandler(servletContext); + httpServer.start(); + + SessionCatalog.SessionContext context = + new SessionCatalog.SessionContext( + UUID.randomUUID().toString(), + "user", + ImmutableMap.of("credential", "user:12345"), + ImmutableMap.of()); + + this.catalog = + new RESTCatalog( + context, + (config) -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build()); + catalog.setConf(new Configuration()); + catalog.initialize( + "prod", + ImmutableMap.of( + CatalogProperties.URI, httpServer.getURI().toString(), "credential", "catalog:12345")); + } + + @AfterEach + public void closeCatalog() throws Exception { + if (null != catalog) { + catalog.close(); + } + + if (null != backendCatalog) { + backendCatalog.close(); + } + + if (null != httpServer) { + httpServer.stop(); + httpServer.join(); + } + } +}