Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;

Expand Down Expand Up @@ -172,4 +173,14 @@ default ManageSnapshots manageSnapshots() {
* @throws CommitFailedException If the updates cannot be committed due to conflicts.
*/
void commitTransaction();

/** Rolls back any pending changes. */
default void rollback() {
throw new UnsupportedOperationException("Rollback not supported");
}

/** Provides access to the pending changes that are about to be committed. */
default List<PendingUpdate> pendingUpdates() {
throw new UnsupportedOperationException("Pending Updates not supported");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.catalog;

public interface CatalogTransaction {

enum IsolationLevel {

/**
* All reads that are being made will see the last committed values that existed when the table
* was loaded inside the catalog transaction. Will successfully commit only if the values
* updated by the transaction do not conflict with other concurrent updates. <br>
* <br>
*
* <p>Note that under SNAPSHOT isolation a <b>write skew anomaly</b> is acceptable and
* permitted. In a <b>write skew anomaly</b>, two transactions (T1 and T2) concurrently read an
* overlapping data set (e.g. values V1 and V2), concurrently make disjoint updates (e.g. T1
* updates V1, T2 updates V2), and finally concurrently commit, neither having seen the update
* performed by the other.
*/
SNAPSHOT,

/**
* All reads that are being made will see the last committed values that existed when the table
* was loaded inside the catalog transaction. All tables participating in the transaction must
* be in the same state when committing compared to when the table was loaded first within the
* catalog transaction.<br>
* <br>
*
* <p>Note that a <b>write skew anomaly</b> is not possible under SERIALIZABLE isolation, where
* two transactions (T1 and T2) concurrently read an overlapping data set (e.g. values V1 and
* V2), concurrently make disjoint updates (e.g. T1 updates V1, T2 updates V2). This is because
* under SERIALIZABLE isolation either T1 or T2 would have to occur first and be visible to the
* other transaction.
*/
SERIALIZABLE;
}

/**
* Performs an atomic commit of all the pending changes across multiple tables. Engine-specific
* implementations must ensure that all pending changes are applied atomically.
*/
void commitTransaction();

/** Rolls back any pending changes across tables. */
void rollback();

/**
* Returns this catalog transaction as a {@link Catalog} API so that any actions that are called
* through this API are participating in this catalog transaction.
*
* @return This catalog transaction as a {@link Catalog} API. Any actions that are called through
* this API are participating in this catalog transaction.
*/
Catalog asCatalog();

/**
* Returns the current {@link IsolationLevel} for this transaction.
*
* @return The {@link IsolationLevel} for this transaction.
*/
IsolationLevel isolationLevel();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.catalog;

import java.util.Set;
import org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel;

public interface SupportsCatalogTransactions {

/**
* Get the set of {@link IsolationLevel}s supported by this Catalog.
*
* @return A set of isolation levels supported.
*/
Set<IsolationLevel> supportedIsolationLevels();

/**
* Start a new {@link CatalogTransaction} with the given {@link IsolationLevel}.
*
* @param isolationLevel The isolation level to use.
* @return A new {@link CatalogTransaction}.
*/
CatalogTransaction startTransaction(IsolationLevel isolationLevel);
}
254 changes: 254 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseCatalogTransaction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.BaseTransaction.TransactionTable;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.CatalogTransaction;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Tasks;

public abstract class BaseCatalogTransaction implements CatalogTransaction {
private final Map<TableIdentifier, Transaction> txByTable;
private final Map<TableIdentifier, TableMetadata> initiallyReadTableMetadata;
private final IsolationLevel isolationLevel;
private final BaseMetastoreCatalog origin;
private boolean hasCommitted = false;

public BaseCatalogTransaction(BaseMetastoreCatalog origin, IsolationLevel isolationLevel) {
Preconditions.checkArgument(null != origin, "Invalid origin catalog: null");
Preconditions.checkArgument(null != isolationLevel, "Invalid isolation level: null");
this.origin = origin;
this.isolationLevel = isolationLevel;
this.txByTable = Maps.newConcurrentMap();
this.initiallyReadTableMetadata = Maps.newConcurrentMap();
}

@Override
public void rollback() {
Tasks.foreach(txByTable.values()).run(Transaction::rollback);
txByTable.clear();
initiallyReadTableMetadata.clear();
}

protected Map<TableIdentifier, Transaction> txByTable() {
return txByTable;
}

protected Map<TableIdentifier, TableMetadata> initiallyReadTableMetadata() {
return initiallyReadTableMetadata;
}

protected Catalog origin() {
return origin;
}

protected boolean hasCommitted() {
return hasCommitted;
}

public void setHasCommitted(boolean hasCommitted) {
this.hasCommitted = hasCommitted;
}

private TableIdentifier identifierWithoutCatalog(String tableWithCatalog) {
if (tableWithCatalog.startsWith(origin.name())) {
return TableIdentifier.parse(tableWithCatalog.replace(origin.name() + ".", ""));
}
return TableIdentifier.parse(tableWithCatalog);
}

@Override
public Catalog asCatalog() {
return new AsTransactionalCatalog();
}

private Optional<Table> txTable(TableIdentifier identifier) {
if (txByTable.containsKey(identifier)) {
return Optional.ofNullable(txByTable.get(identifier).table());
}

return Optional.empty();
}

private Transaction txForTable(Table table) {
return txByTable.computeIfAbsent(
identifierWithoutCatalog(table.name()),
k -> {
TableOperations operations = ((HasTableOperations) table).operations();
return Transactions.newTransaction(table.name(), operations);
});
}

@Override
public IsolationLevel isolationLevel() {
return isolationLevel;
}

public class AsTransactionalCatalog extends BaseMetastoreCatalog {
@Override
public Table loadTable(TableIdentifier identifier) {
Table table =
BaseCatalogTransaction.this
.txTable(identifier)
.orElseGet(
() -> {
Table loadTable = origin.loadTable(identifier);

// we need to remember the very first version of table metadata that we read
if (IsolationLevel.SERIALIZABLE == isolationLevel()) {
initiallyReadTableMetadata.computeIfAbsent(
identifier, (ident) -> ((BaseTable) loadTable).operations().current());
}

return loadTable;
});

TableOperations tableOps =
table instanceof TransactionTable
? ((TransactionTable) table).operations()
: ((BaseTable) table).operations();
return new TransactionalTable(table, tableOps);
}

@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
Optional<Table> txTable = BaseCatalogTransaction.this.txTable(tableIdentifier);
if (txTable.isPresent()) {
return ((TransactionTable) txTable.get()).operations();
}
return origin.newTableOps(tableIdentifier);
}

@Override
protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
return origin.defaultWarehouseLocation(tableIdentifier);
}

@Override
public List<TableIdentifier> listTables(Namespace namespace) {
return origin.listTables(namespace);
}

@Override
public boolean dropTable(TableIdentifier identifier, boolean purge) {
return origin.dropTable(identifier, purge);
}

@Override
public void renameTable(TableIdentifier from, TableIdentifier to) {
origin.renameTable(from, to);
}
}

private class TransactionalTable extends BaseTable {
private final Table table;

private TransactionalTable(Table table, TableOperations ops) {
super(ops, table.name());
this.table = table;
}

@Override
public UpdateSchema updateSchema() {
return txForTable(table).updateSchema();
}

@Override
public UpdatePartitionSpec updateSpec() {
return txForTable(table).updateSpec();
}

@Override
public UpdateProperties updateProperties() {
return txForTable(table).updateProperties();
}

@Override
public ReplaceSortOrder replaceSortOrder() {
return txForTable(table).replaceSortOrder();
}

@Override
public UpdateLocation updateLocation() {
return txForTable(table).updateLocation();
}

@Override
public AppendFiles newAppend() {
return txForTable(table).newAppend();
}

@Override
public AppendFiles newFastAppend() {
return txForTable(table).newFastAppend();
}

@Override
public RewriteFiles newRewrite() {
return txForTable(table).newRewrite();
}

@Override
public RewriteManifests rewriteManifests() {
return txForTable(table).rewriteManifests();
}

@Override
public OverwriteFiles newOverwrite() {
return txForTable(table).newOverwrite();
}

@Override
public RowDelta newRowDelta() {
return txForTable(table).newRowDelta();
}

@Override
public ReplacePartitions newReplacePartitions() {
return txForTable(table).newReplacePartitions();
}

@Override
public DeleteFiles newDelete() {
return txForTable(table).newDelete();
}

@Override
public UpdateStatistics updateStatistics() {
return txForTable(table).updateStatistics();
}

@Override
public ExpireSnapshots expireSnapshots() {
return txForTable(table).expireSnapshots();
}

@Override
public ManageSnapshots manageSnapshots() {
return txForTable(table).manageSnapshots();
}
}
}
Loading