Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,14 @@ acceptedBreaks:
old: "class org.apache.iceberg.Metrics"
new: "class org.apache.iceberg.Metrics"
justification: "Java serialization across versions is not guaranteed"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.Table org.apache.iceberg.catalog.SessionCatalog::registerTable(org.apache.iceberg.catalog.SessionCatalog.SessionContext,\
\ org.apache.iceberg.catalog.TableIdentifier, java.lang.String, boolean)"
justification: "New API with default implementation provided"
org.apache.iceberg:iceberg-core:
- code: "java.method.addedToInterface"
new: "method boolean org.apache.iceberg.rest.requests.RegisterTableRequest::overwrite()"
justification: "All subclasses implement the new method"
- code: "java.method.removed"
old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\
\ org.apache.hadoop.conf.Configuration)"
Expand Down
19 changes: 19 additions & 0 deletions api/src/main/java/org/apache/iceberg/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,25 @@ default void invalidateTable(TableIdentifier identifier) {}
* @throws AlreadyExistsException if the table already exists in the catalog.
*/
default Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
return registerTable(
identifier, metadataFileLocation, false /* register only if it does not exist */);
}

/**
* Register a table with the catalog, optionally overwrite existing table metadata.
*
* <p><strong>Note:</strong> Overwriting an existing table may result in a change of table UUID,
* to match the one in the metadata file.
*
* @param identifier a table identifier
* @param metadataFileLocation the location of a metadata file
* @param overwrite if true, overwrite the existing table with provided metadata
* @return a Table instance
* @throws AlreadyExistsException if the table already exists in the catalog and overwrite is
* false.
*/
default Table registerTable(
TableIdentifier identifier, String metadataFileLocation, boolean overwrite) {
throw new UnsupportedOperationException("Registering tables is not supported");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is worth while, but you could decide only to fail if "overwrite" is true"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @RussellSpitzer , in this PR I introduced a new register-overwrite API on catalog interface, and changed as new base for register-table (where it can call the new API with overwrite=false).

Before:

default register-table API throw UnsupportedOperationException

After:

default register-table API -> register-overwrite(overwrite=true)
default register-overwrite API throw UnsupportedOperationException


The benefits are all concrete catalog implementations can just implement the new API and interface is only used for delegation between the 2 APIs. This is an easier to reason (as all register logic sits in one place) and follows the convention like drop-table and drop-table-purge.

The potential downside is that some custom catalog implementations outside iceberg repo who implements the register-table API, might need to update their code when upgrades iceberg dependency with the interface change. I feel like it's generally justified for customized catalog to keep up with iceberg interface change. Please let me know if you think otherwise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right here, let's just keep it this way (clean it up in 2.0)

}

Expand Down
25 changes: 24 additions & 1 deletion api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,30 @@ public Object wrappedIdentity() {
* @return a Table instance
* @throws AlreadyExistsException if the table already exists in the catalog.
*/
Table registerTable(SessionContext context, TableIdentifier ident, String metadataFileLocation);
default Table registerTable(
SessionContext context, TableIdentifier ident, String metadataFileLocation) {
return registerTable(context, ident, metadataFileLocation, false);
}

/**
* Register a table with the catalog, optionally overwrite existing table metadata.
*
* <p><strong>Note:</strong> Overwriting an existing table may result in a change of table UUID,
* to match the one in the metadata file.
*
* @param context session context
* @param ident a table identifier
* @param metadataFileLocation the location of a metadata file
* @param overwrite if true, overwrite the existing table with provided metadata
* @return a Table instance
* @throws AlreadyExistsException if the table already exists in the catalog and overwrite is
* false.
*/
Table registerTable(
SessionContext context,
TableIdentifier ident,
String metadataFileLocation,
boolean overwrite);

/**
* Check whether table exists.
Expand Down
43 changes: 34 additions & 9 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -71,24 +70,43 @@ public Table loadTable(TableIdentifier identifier) {
}

@Override
public Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
public Table registerTable(
TableIdentifier identifier, String metadataFileLocation, boolean overwrite) {
Preconditions.checkArgument(
identifier != null && isValidIdentifier(identifier), "Invalid identifier: %s", identifier);
Preconditions.checkArgument(
metadataFileLocation != null && !metadataFileLocation.isEmpty(),
"Cannot register an empty metadata file location as a table");

// Throw an exception if this table already exists in the catalog.
if (tableExists(identifier)) {
TableOperations ops = newTableOps(identifier);

if (overwrite) {
return overwriteTable(identifier, metadataFileLocation, ops);
} else if (ops.current() == null) {
// create a new table
TableMetadata newMetadata =
TableMetadataParser.read(ops.io().newInputFile(metadataFileLocation));
ops.commit(null, newMetadata);
return new BaseTable(ops, fullTableName(name(), identifier), metricsReporter());
} else {
throw new AlreadyExistsException("Table already exists: %s", identifier);
}
}

TableOperations ops = newTableOps(identifier);
InputFile metadataFile = ops.io().newInputFile(metadataFileLocation);
TableMetadata metadata = TableMetadataParser.read(metadataFile);
ops.commit(null, metadata);
private Table overwriteTable(
TableIdentifier identifier, String metadataFileLocation, TableOperations ops) {
TableMetadata currentMetadata = ops.current();

return new BaseTable(ops, fullTableName(name(), identifier), metricsReporter());
if (currentMetadata != null
&& currentMetadata.metadataFileLocation().equals(metadataFileLocation)) {
LOG.info(
"The requested metadata matches the existing metadata. No changes will be committed.");
return new BaseTable(ops, fullTableName(name(), identifier), metricsReporter());
}

// create or replace table metadata atomically
setAsCurrent(identifier, metadataFileLocation);
return loadTable(identifier);
}

@Override
Expand Down Expand Up @@ -299,6 +317,13 @@ protected MetricsReporter metricsReporter() {
return metricsReporter;
}

// Atomically set the given metadata location as the current table metadata
protected void setAsCurrent(TableIdentifier identifier, String metadataLocation) {
throw new UnsupportedOperationException(
String.format(
"Overwrite table metadata on registration is not supported in %s catalog", name()));
}

@Override
public void close() throws IOException {
if (metricsReporter != null) {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/CachingCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,9 @@ public void invalidateTable(TableIdentifier ident) {
}

@Override
public Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
Table table = catalog.registerTable(identifier, metadataFileLocation);
public Table registerTable(
TableIdentifier identifier, String metadataFileLocation, boolean overwrite) {
Table table = catalog.registerTable(identifier, metadataFileLocation, overwrite);
invalidateTable(identifier);
return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ public TableBuilder buildTable(TableIdentifier ident, Schema schema) {
}

@Override
public Table registerTable(TableIdentifier ident, String metadataFileLocation) {
return BaseSessionCatalog.this.registerTable(context, ident, metadataFileLocation);
public Table registerTable(
TableIdentifier ident, String metadataFileLocation, boolean overwrite) {
return BaseSessionCatalog.this.registerTable(context, ident, metadataFileLocation, overwrite);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
defaultNamespaceLocation(tableIdentifier.namespace()), tableIdentifier.name());
}

@Override
protected void setAsCurrent(TableIdentifier identifier, String metadataLocation) {
synchronized (this) {
tables.put(identifier, metadataLocation);
}
}

private String defaultNamespaceLocation(Namespace namespace) {
if (namespace.isEmpty()) {
return warehouseLocation;
Expand Down
33 changes: 33 additions & 0 deletions core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.DataTruncation;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -28,6 +29,7 @@
import java.sql.SQLNonTransientConnectionException;
import java.sql.SQLTimeoutException;
import java.sql.SQLTransientConnectionException;
import java.sql.SQLWarning;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.List;
Expand All @@ -49,6 +51,7 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand Down Expand Up @@ -283,6 +286,36 @@ protected String defaultWarehouseLocation(TableIdentifier table) {
return SLASH.join(defaultNamespaceLocation(table.namespace()), table.name());
}

@Override
protected void setAsCurrent(TableIdentifier tableIdentifier, String metadataLocation) {
try {
int updatedRecords =
JdbcUtil.setMetadataLocationTable(
schemaVersion, connections, catalogName, tableIdentifier, metadataLocation);
if (updatedRecords == 1) {
LOG.debug(
"Successfully committed {} to existing table: {}", metadataLocation, tableIdentifier);
} else {
throw new CommitFailedException(
"Failed to update table %s to %s from catalog %s",
tableIdentifier, metadataLocation, catalogName);
}
} catch (SQLTimeoutException e) {
throw new UncheckedSQLException(e, "Database Connection timeout");
} catch (SQLTransientConnectionException | SQLNonTransientConnectionException e) {
throw new UncheckedSQLException(e, "Database Connection failed");
} catch (DataTruncation e) {
throw new UncheckedSQLException(e, "Database data truncation error");
} catch (SQLWarning e) {
throw new UncheckedSQLException(e, "Database warning");
} catch (SQLException e) {
throw new UncheckedSQLException(e, "Unknown failure");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e, "Interrupted during setAsCurrent");
}
}

@Override
public boolean dropTable(TableIdentifier identifier, boolean purge) {
TableOperations ops = newTableOps(identifier);
Expand Down
88 changes: 88 additions & 0 deletions core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,57 @@ enum SchemaVersion {
+ " = ? AND "
+ JdbcTableOperations.METADATA_LOCATION_PROP
+ " = ?";
private static final String V1_SET_METADATA_TABLE_SQL =
"UPDATE "
+ CATALOG_TABLE_VIEW_NAME
+ " SET "
+ JdbcTableOperations.METADATA_LOCATION_PROP
+ " = ?"
+ " WHERE "
+ CATALOG_NAME
+ " = ? AND "
+ TABLE_NAMESPACE
+ " = ? AND "
+ TABLE_NAME
+ " = ? AND ("
+ RECORD_TYPE
+ " = '"
+ TABLE_RECORD_TYPE
+ "'"
+ " OR "
+ RECORD_TYPE
+ " IS NULL)";
private static final String V1_SET_METADATA_VIEW_SQL =
"UPDATE "
+ CATALOG_TABLE_VIEW_NAME
+ " SET "
+ JdbcTableOperations.METADATA_LOCATION_PROP
+ " = ?"
+ " WHERE "
+ CATALOG_NAME
+ " = ? AND "
+ TABLE_NAMESPACE
+ " = ? AND "
+ TABLE_NAME
+ " = ? AND "
+ RECORD_TYPE
+ " = "
+ "'"
+ VIEW_RECORD_TYPE
+ "'";
private static final String V0_SET_METADATA_SQL =
"UPDATE "
+ CATALOG_TABLE_VIEW_NAME
+ " SET "
+ JdbcTableOperations.METADATA_LOCATION_PROP
+ " = ?"
+ " WHERE "
+ CATALOG_NAME
+ " = ? AND "
+ TABLE_NAMESPACE
+ " = ? AND "
+ TABLE_NAME
+ " = ?";
static final String V0_CREATE_CATALOG_SQL =
"CREATE TABLE "
+ CATALOG_TABLE_VIEW_NAME
Expand Down Expand Up @@ -527,6 +578,32 @@ static Properties filterAndRemovePrefix(Map<String, String> properties, String p
return result;
}

private static int setMetadataLocation(
boolean isTable,
SchemaVersion schemaVersion,
JdbcClientPool connections,
String catalogName,
TableIdentifier identifier,
String newMetadataLocation)
throws SQLException, InterruptedException {
return connections.run(
conn -> {
try (PreparedStatement sql =
conn.prepareStatement(
(schemaVersion == SchemaVersion.V1)
? (isTable ? V1_SET_METADATA_TABLE_SQL : V1_SET_METADATA_VIEW_SQL)
: V0_SET_METADATA_SQL)) {
// UPDATE
sql.setString(1, newMetadataLocation);
// WHERE
sql.setString(2, catalogName);
sql.setString(3, namespaceToString(identifier.namespace()));
sql.setString(4, identifier.name());
return sql.executeUpdate();
}
});
}

private static int update(
boolean isTable,
SchemaVersion schemaVersion,
Expand Down Expand Up @@ -557,6 +634,17 @@ private static int update(
});
}

static int setMetadataLocationTable(
SchemaVersion schemaVersion,
JdbcClientPool connections,
String catalogName,
TableIdentifier tableIdentifier,
String metadataLocation)
throws SQLException, InterruptedException {
return setMetadataLocation(
true, schemaVersion, connections, catalogName, tableIdentifier, metadataLocation);
}

static int updateTable(
SchemaVersion schemaVersion,
JdbcClientPool connections,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ public static LoadTableResponse registerTable(
request.validate();

TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
Table table = catalog.registerTable(identifier, request.metadataLocation());
Table table =
catalog.registerTable(
identifier, request.metadataLocation(), Boolean.TRUE.equals(request.overwrite()));
if (table instanceof BaseTable) {
return LoadTableResponse.builder()
.withTableMetadata(((BaseTable) table).operations().current())
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ public Table registerTable(TableIdentifier ident, String metadataFileLocation) {
return delegate.registerTable(ident, metadataFileLocation);
}

@Override
public Table registerTable(
TableIdentifier ident, String metadataFileLocation, boolean overwrite) {
return delegate.registerTable(ident, metadataFileLocation, overwrite);
}

@Override
public void createNamespace(Namespace ns, Map<String, String> props) {
nsDelegate.createNamespace(ns, props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,10 @@ public void invalidateTable(SessionContext context, TableIdentifier ident) {}

@Override
public Table registerTable(
SessionContext context, TableIdentifier ident, String metadataFileLocation) {
SessionContext context,
TableIdentifier ident,
String metadataFileLocation,
boolean overwrite) {
Endpoint.check(endpoints, Endpoint.V1_REGISTER_TABLE);
checkIdentifierIsValid(ident);

Expand All @@ -512,6 +515,7 @@ public Table registerTable(
ImmutableRegisterTableRequest.builder()
.name(ident.name())
.metadataLocation(metadataFileLocation)
.overwrite(overwrite)
.build();

AuthSession contextualSession = authManager.contextualSession(context, catalogAuth);
Expand Down
Loading