diff --git a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java index 897acd2e3ba6..47a61a6914d7 100644 --- a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java +++ b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java @@ -347,6 +347,21 @@ default Table registerTable(TableIdentifier identifier, String metadataFileLocat throw new UnsupportedOperationException("Registering tables is not supported"); } + /** + * Register a table with the catalog if it does not exist overwrite table metadata if the table + * already exists. + * + * @param identifier a table identifier + * @param metadataFileLocation the location of a metadata file + * @param overwrite whether to overwrite table metadata if the table already exists + * @return a Table instance + * @throws AlreadyExistsException if the table already exists in the catalog. + */ + default Table registerTable( + TableIdentifier identifier, String metadataFileLocation, boolean overwrite) { + throw new UnsupportedOperationException("Registering tables is not supported"); + } + /** * Instantiate a builder to either create a table or start a create/replace transaction. * diff --git a/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java b/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java index fe29f8918531..46ba5e1e78c1 100644 --- a/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java +++ b/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java @@ -170,6 +170,25 @@ public Object wrappedIdentity() { */ Table registerTable(SessionContext context, TableIdentifier ident, String metadataFileLocation); + /** + * Register a table with the catalog if it does not exist overwrite table metadata if the table + * already exists. + * + * @param context session context + * @param ident a table identifier + * @param metadataFileLocation the location of a metadata file + * @param overwrite whether to overwrite table metadata if the table already exists + * @return a Table instance + * @throws AlreadyExistsException if the table already exists in the catalog. + */ + default Table registerTable( + SessionContext context, + TableIdentifier ident, + String metadataFileLocation, + boolean overwrite) { + return registerTable(context, ident, metadataFileLocation); + } + /** * Check whether table exists. * diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index 940d7fa05ec6..8cbdd1d831d1 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -72,21 +72,30 @@ public Table loadTable(TableIdentifier identifier) { @Override public Table registerTable(TableIdentifier identifier, String metadataFileLocation) { + return registerTable(identifier, metadataFileLocation, false); + } + + @Override + public Table registerTable( + TableIdentifier identifier, String metadataFileLocation, boolean overwrite) { Preconditions.checkArgument( identifier != null && isValidIdentifier(identifier), "Invalid identifier: %s", identifier); Preconditions.checkArgument( metadataFileLocation != null && !metadataFileLocation.isEmpty(), "Cannot register an empty metadata file location as a table"); - // Throw an exception if this table already exists in the catalog. - if (tableExists(identifier)) { + // Throw an exception if this table already exists in the catalog and + // there is no intention to overwrite metadata. + if (!overwrite && tableExists(identifier)) { throw new AlreadyExistsException("Table already exists: %s", identifier); } TableOperations ops = newTableOps(identifier); InputFile metadataFile = ops.io().newInputFile(metadataFileLocation); TableMetadata metadata = TableMetadataParser.read(metadataFile); - ops.commit(null, metadata); + + TableMetadata current = overwrite ? ops.current() : null; + ops.commit(current, metadata); return new BaseTable(ops, fullTableName(name(), identifier), metricsReporter()); } diff --git a/core/src/main/java/org/apache/iceberg/CachingCatalog.java b/core/src/main/java/org/apache/iceberg/CachingCatalog.java index 913f1a9482e1..5c92fe7db602 100644 --- a/core/src/main/java/org/apache/iceberg/CachingCatalog.java +++ b/core/src/main/java/org/apache/iceberg/CachingCatalog.java @@ -197,6 +197,14 @@ public Table registerTable(TableIdentifier identifier, String metadataFileLocati return table; } + @Override + public Table registerTable( + TableIdentifier identifier, String metadataFileLocation, boolean overwrite) { + Table table = catalog.registerTable(identifier, metadataFileLocation, overwrite); + invalidateTable(identifier); + return table; + } + private Iterable metadataTableIdentifiers(TableIdentifier ident) { ImmutableList.Builder builder = ImmutableList.builder(); diff --git a/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java b/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java index d6ee4d345cfa..ae830791def2 100644 --- a/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java @@ -89,6 +89,12 @@ public Table registerTable(TableIdentifier ident, String metadataFileLocation) { return BaseSessionCatalog.this.registerTable(context, ident, metadataFileLocation); } + @Override + public Table registerTable( + TableIdentifier ident, String metadataFileLocation, boolean overwrite) { + return BaseSessionCatalog.this.registerTable(context, ident, metadataFileLocation, overwrite); + } + @Override public boolean tableExists(TableIdentifier ident) { return BaseSessionCatalog.this.tableExists(context, ident); diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index 83c165ed668f..48d2910ee053 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -295,7 +295,8 @@ public static LoadTableResponse registerTable( request.validate(); TableIdentifier identifier = TableIdentifier.of(namespace, request.name()); - Table table = catalog.registerTable(identifier, request.metadataLocation()); + Table table = + catalog.registerTable(identifier, request.metadataLocation(), request.overwrite()); if (table instanceof BaseTable) { return LoadTableResponse.builder() .withTableMetadata(((BaseTable) table).operations().current()) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java index 0176128ed576..425a9e194190 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java @@ -222,6 +222,12 @@ public Table registerTable(TableIdentifier ident, String metadataFileLocation) { return delegate.registerTable(ident, metadataFileLocation); } + @Override + public Table registerTable( + TableIdentifier identifier, String metadataFileLocation, boolean overwrite) { + return delegate.registerTable(identifier, metadataFileLocation, overwrite); + } + @Override public void createNamespace(Namespace ns, Map props) { nsDelegate.createNamespace(ns, props); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index b903f13adc09..892d81982960 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -500,6 +500,15 @@ public void invalidateTable(SessionContext context, TableIdentifier ident) {} @Override public Table registerTable( SessionContext context, TableIdentifier ident, String metadataFileLocation) { + return registerTable(context, ident, metadataFileLocation, false); + } + + @Override + public Table registerTable( + SessionContext context, + TableIdentifier ident, + String metadataFileLocation, + boolean overwrite) { Endpoint.check(endpoints, Endpoint.V1_REGISTER_TABLE); checkIdentifierIsValid(ident); @@ -512,6 +521,7 @@ public Table registerTable( ImmutableRegisterTableRequest.builder() .name(ident.name()) .metadataLocation(metadataFileLocation) + .overwrite(overwrite) .build(); AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/RegisterTableRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/RegisterTableRequest.java index 33b37dae242f..e164a7ec5154 100644 --- a/core/src/main/java/org/apache/iceberg/rest/requests/RegisterTableRequest.java +++ b/core/src/main/java/org/apache/iceberg/rest/requests/RegisterTableRequest.java @@ -28,6 +28,11 @@ public interface RegisterTableRequest extends RESTRequest { String metadataLocation(); + @Value.Default + default boolean overwrite() { + return false; + } + @Override default void validate() { // nothing to validate as it's not possible to create an invalid instance diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/RegisterTableRequestParser.java b/core/src/main/java/org/apache/iceberg/rest/requests/RegisterTableRequestParser.java index 961b6c185b87..aa2d2eb2cc21 100644 --- a/core/src/main/java/org/apache/iceberg/rest/requests/RegisterTableRequestParser.java +++ b/core/src/main/java/org/apache/iceberg/rest/requests/RegisterTableRequestParser.java @@ -28,6 +28,7 @@ public class RegisterTableRequestParser { private static final String NAME = "name"; private static final String METADATA_LOCATION = "metadata-location"; + private static final String OVERWRITE = "overwrite"; private RegisterTableRequestParser() {} @@ -46,6 +47,7 @@ public static void toJson(RegisterTableRequest request, JsonGenerator gen) throw gen.writeStringField(NAME, request.name()); gen.writeStringField(METADATA_LOCATION, request.metadataLocation()); + gen.writeBooleanField(OVERWRITE, request.overwrite()); gen.writeEndObject(); } @@ -60,10 +62,12 @@ public static RegisterTableRequest fromJson(JsonNode json) { String name = JsonUtil.getString(NAME, json); String metadataLocation = JsonUtil.getString(METADATA_LOCATION, json); + Boolean overwrite = JsonUtil.getBool(OVERWRITE, json); return ImmutableRegisterTableRequest.builder() .name(name) .metadataLocation(metadataLocation) + .overwrite(Boolean.TRUE.equals(overwrite)) .build(); } } diff --git a/core/src/test/java/org/apache/iceberg/rest/requests/TestRegisterTableRequestParser.java b/core/src/test/java/org/apache/iceberg/rest/requests/TestRegisterTableRequestParser.java index 50a47df974a3..611ea7b9f984 100644 --- a/core/src/test/java/org/apache/iceberg/rest/requests/TestRegisterTableRequestParser.java +++ b/core/src/test/java/org/apache/iceberg/rest/requests/TestRegisterTableRequestParser.java @@ -67,7 +67,32 @@ public void roundTripSerde() { String expectedJson = "{\n" + " \"name\" : \"table_1\",\n" - + " \"metadata-location\" : \"file://tmp/NS/test_tbl/metadata/00000-d4f60d2f-2ad2-408b-8832-0ed7fbd851ee.metadata.json\"\n" + + " \"metadata-location\" : \"file://tmp/NS/test_tbl/metadata/00000-d4f60d2f-2ad2-408b-8832-0ed7fbd851ee.metadata.json\",\n" + + " \"overwrite\" : false\n" + + "}"; + + String json = RegisterTableRequestParser.toJson(request, true); + assertThat(json).isEqualTo(expectedJson); + + assertThat(RegisterTableRequestParser.toJson(RegisterTableRequestParser.fromJson(json), true)) + .isEqualTo(expectedJson); + } + + @Test + public void roundTripSerdeWithOverwrite() { + RegisterTableRequest request = + ImmutableRegisterTableRequest.builder() + .name("table_1") + .metadataLocation( + "file://tmp/NS/test_tbl/metadata/00000-d4f60d2f-2ad2-408b-8832-0ed7fbd851ee.metadata.json") + .overwrite(true) + .build(); + + String expectedJson = + "{\n" + + " \"name\" : \"table_1\",\n" + + " \"metadata-location\" : \"file://tmp/NS/test_tbl/metadata/00000-d4f60d2f-2ad2-408b-8832-0ed7fbd851ee.metadata.json\",\n" + + " \"overwrite\" : true\n" + "}"; String json = RegisterTableRequestParser.toJson(request, true); diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index f9809714c3c2..c31e19b948a3 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -725,9 +725,10 @@ Creates a catalog entry for a metadata.json file which already exists but does n #### Usage | Argument Name | Required? | Type | Description | -|---------------|-----------|------|-------------| -| `table` | ✔️ | string | Table which is to be registered | -| `metadata_file`| ✔️ | string | Metadata file which is to be registered as a new catalog identifier | +|---------------|----------|------|-------------| +| `table` | ✔️ | string | Table which is to be registered | +| `metadata_file`| ✔️ | string | Metadata file which is to be registered as a new catalog identifier | +| `overwrite`| ️ | boolean | Whether to overwrite table metadata if the table already exists | !!! warning Having the same metadata.json registered in more than one catalog can lead to missing updates, loss of data, and table corruption. @@ -750,6 +751,14 @@ CALL spark_catalog.system.register_table( metadata_file => 'path/to/metadata/file.json' ); ``` +Re-register an existing table `db.tbl` in `spark_catalog` to point to new metadata.json file `path/to/metadata/file.json`. +```sql +CALL spark_catalog.system.register_table( + table => 'db.tbl', + metadata_file => 'path/to/metadata/file.json', + overwrite => true +); +``` ## Metadata information diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java index a06a67b7d612..e5d67354e3ac 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java @@ -19,12 +19,16 @@ package org.apache.iceberg.spark.extensions; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.atIndex; +import java.util.Arrays; import java.util.List; +import java.util.Set; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.TableUtil; +import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.spark.Spark3Util; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.parser.ParseException; @@ -38,17 +42,17 @@ @ExtendWith(ParameterizedTestExtension.class) public class TestRegisterTableProcedure extends ExtensionsTestBase { - private String targetName; + private String targetTableName; @BeforeEach public void setTargetName() { - targetName = tableName("register_table"); + targetTableName = tableName("register_table"); } @AfterEach public void dropTables() { sql("DROP TABLE IF EXISTS %s", tableName); - sql("DROP TABLE IF EXISTS %s", targetName); + sql("DROP TABLE IF EXISTS %s", targetTableName); } @TestTemplate @@ -68,13 +72,15 @@ public void testRegisterTable() throws NoSuchTableException, ParseException { String metadataJson = TableUtil.metadataFileLocation(table); List result = - sql("CALL %s.system.register_table('%s', '%s')", catalogName, targetName, metadataJson); + sql( + "CALL %s.system.register_table('%s', '%s')", + catalogName, targetTableName, metadataJson); assertThat(result.get(0)) .as("Current Snapshot is not correct") .contains(currentSnapshotId, atIndex(0)); List original = sql("SELECT * FROM %s", tableName); - List registered = sql("SELECT * FROM %s", targetName); + List registered = sql("SELECT * FROM %s", targetTableName); assertEquals("Registered table rows should match original table rows", original, registered); assertThat(result.get(0)) .as("Should have the right row count in the procedure result") @@ -82,4 +88,110 @@ public void testRegisterTable() throws NoSuchTableException, ParseException { .as("Should have the right datafile count in the procedure result") .contains(originalFileCount, atIndex(2)); } + + @TestTemplate + public void testRegisterTableOverwrite() throws NoSuchTableException, ParseException { + long numRows = 100; + + sql("CREATE TABLE %s (id int, data string) using ICEBERG", tableName); + spark + .range(0, numRows) + .withColumn("data", functions.col("id").cast(DataTypes.StringType)) + .writeTo(tableName) + .append(); + + Table table = Spark3Util.loadIcebergTable(spark, tableName); + long originalFileCount = (long) scalarSql("SELECT COUNT(*) from %s.files", tableName); + long currentSnapshotId = table.currentSnapshot().snapshotId(); + String metadataJson = TableUtil.metadataFileLocation(table); + + List result = + sql( + "CALL %s.system.register_table('%s', '%s', '%b')", + catalogName, targetTableName, metadataJson, true); + assertThat(result.get(0)) + .as("Current Snapshot is not correct") + .contains(currentSnapshotId, atIndex(0)); + + List original = sql("SELECT * FROM %s", tableName); + List registered = sql("SELECT * FROM %s", targetTableName); + assertEquals("Registered table rows should match original table rows", original, registered); + assertThat(result.get(0)) + .as("Should have the right row count in the procedure result") + .contains(numRows, atIndex(1)) + .as("Should have the right datafile count in the procedure result") + .contains(originalFileCount, atIndex(2)); + } + + @TestTemplate + public void testReRegisterTableOverwrite() throws NoSuchTableException, ParseException { + testRegisterTable(); + + long originalRowsCount = (long) scalarSql("SELECT COUNT(*) from %s", tableName); + long additionalNumRows = 10; + + spark + .range(originalRowsCount, originalRowsCount + additionalNumRows) + .withColumn("data", functions.col("id").cast(DataTypes.StringType)) + .writeTo(tableName) + .append(); + originalRowsCount = (long) scalarSql("SELECT COUNT(*) from %s", tableName); + + // Test few writes before re-register table + spark + .range(originalRowsCount, originalRowsCount + additionalNumRows) + .withColumn("data", functions.col("id").cast(DataTypes.StringType)) + .writeTo(tableName) + .append(); + originalRowsCount = (long) scalarSql("SELECT COUNT(*) from %s", tableName); + + Table table = Spark3Util.loadIcebergTable(spark, tableName); + long originalFileCount = (long) scalarSql("SELECT COUNT(*) from %s.files", tableName); + long currentSnapshotId = table.currentSnapshot().snapshotId(); + String metadataJson = TableUtil.metadataFileLocation(table); + + List result = + sql( + "CALL %s.system.register_table('%s', '%s', '%b')", + catalogName, targetTableName, metadataJson, true); + assertThat(result.get(0)) + .as("Current Snapshot is not correct") + .contains(currentSnapshotId, atIndex(0)); + + Set original = Set.copyOf(sql("SELECT * FROM %s", tableName)); + Set registered = Set.copyOf(sql("SELECT * FROM %s", targetTableName)); + + assertThat(original) + .usingElementComparator((arr1, arr2) -> Arrays.deepEquals(arr1, arr2) ? 0 : 1) + .containsExactlyInAnyOrderElementsOf(registered); + assertThat(result.get(0)) + .as("Should have the right row count in the procedure result") + .contains(originalRowsCount, atIndex(1)) + .as("Should have the right datafile count in the procedure result") + .contains(originalFileCount, atIndex(2)); + } + + @TestTemplate + public void testReRegisterTableNotOverwrite() throws NoSuchTableException, ParseException { + testRegisterTable(); + + long numRows = 100; + + spark + .range(0, numRows) + .withColumn("data", functions.col("id").cast(DataTypes.StringType)) + .writeTo(tableName) + .append(); + + Table table = Spark3Util.loadIcebergTable(spark, tableName); + String metadataJson = TableUtil.metadataFileLocation(table); + + assertThatExceptionOfType(AlreadyExistsException.class) + .isThrownBy( + () -> + sql( + "CALL %s.system.register_table('%s', '%s', '%b')", + catalogName, targetTableName, metadataJson, false)) + .withMessageContaining("Table already exists:"); + } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java index 9ba577ad7e24..8212d1e13756 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java @@ -46,9 +46,11 @@ class RegisterTableProcedure extends BaseProcedure { requiredInParameter("table", DataTypes.StringType); private static final ProcedureParameter METADATA_FILE_PARAM = requiredInParameter("metadata_file", DataTypes.StringType); + private static final ProcedureParameter OVERWRITE_PARAM = + optionalInParameter("overwrite", DataTypes.BooleanType, "false"); private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] {TABLE_PARAM, METADATA_FILE_PARAM}; + new ProcedureParameter[] {TABLE_PARAM, METADATA_FILE_PARAM, OVERWRITE_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -94,9 +96,10 @@ public Iterator call(InternalRow args) { Preconditions.checkArgument( metadataFile != null && !metadataFile.isEmpty(), "Cannot handle an empty argument metadata_file"); + boolean isOverwrite = input.asBoolean(OVERWRITE_PARAM, false); Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog(); - Table table = icebergCatalog.registerTable(tableName, metadataFile); + Table table = icebergCatalog.registerTable(tableName, metadataFile, isOverwrite); Long currentSnapshotId = null; Long totalDataFiles = null; Long totalRecords = null;