Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/src/main/java/org/apache/iceberg/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,19 @@ default boolean dropTable(TableIdentifier identifier) {
default void invalidateTable(TableIdentifier identifier) {
}

/**
* Register a table with the catalog if it does not exist.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @bryanck. What do you think about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, it is very straightforward, I just had one question below...

*
* @param identifier a table identifier
* @param metadataFileLocation the location of a metadata file
* @return a Table instance
* @throws AlreadyExistsException if the table already exists in the catalog.
*/
default Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
throw new UnsupportedOperationException("Registering tables is not supported");
}

/**
/**
* Instantiate a builder to either create a table or start a create/replace transaction.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
Expand All @@ -49,6 +51,7 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -211,6 +214,23 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
}
}

@Override
public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put this in BaseMetastoreCatalog? It doesn't look like there is anything Hive-specific here, so other catalog implementations could potentially benefit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. Potentially, other catalogs can benefit from this. However, FileIO is initialized with the catalog and there maybe custom implementations passed as catalog properties. I'm not sure yet on how to move this logic to BaseMetastoreCatalog. Do you mind if I do that as a seperate enhancement PR to this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks

Preconditions.checkArgument(isValidIdentifier(identifier), "Invalid identifier: %s", identifier);

// Throw an exception if this table already exists in the catalog.
if (tableExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", identifier);
}

TableOperations ops = newTableOps(identifier);
InputFile metadataFile = fileIO.newInputFile(metadataFileLocation);
TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
ops.commit(null, metadata);

return new BaseTable(ops, identifier.toString());
}

@Override
public void createNamespace(Namespace namespace, Map<String, String> meta) {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ protected void doRefresh() {
@SuppressWarnings("checkstyle:CyclomaticComplexity")
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
String newMetadataLocation = base == null && metadata.metadataFileLocation() != null ?
metadata.metadataFileLocation() : writeNewMetadata(metadata, currentVersion() + 1);
Comment on lines +213 to +214
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this change is not related. Can we remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it used by the new test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this change, registering a table creates a new metadata file with a new version instead of using the version provided by the metadata file. Yes, the tests also rely on this.

boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.ConfigProperties;
Expand All @@ -65,6 +66,7 @@
import static java.nio.file.attribute.PosixFilePermissions.fromString;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
Expand Down Expand Up @@ -370,6 +372,51 @@ public void testNonDefaultDatabaseLocation() throws IOException, TException {
metastoreClient.dropDatabase(NON_DEFAULT_DATABASE, true, true, true);
}

@Test
public void testRegisterTable() throws TException {
org.apache.hadoop.hive.metastore.api.Table originalTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);

Map<String, String> originalParams = originalTable.getParameters();
Assert.assertNotNull(originalParams);
Assert.assertTrue(ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(originalParams.get(TABLE_TYPE_PROP)));
Assert.assertTrue("EXTERNAL_TABLE".equalsIgnoreCase(originalTable.getTableType()));

catalog.dropTable(TABLE_IDENTIFIER, false);
Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER));

List<String> metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
Assert.assertEquals(1, metadataVersionFiles.size());

catalog.registerTable(TABLE_IDENTIFIER, "file:" + metadataVersionFiles.get(0));

org.apache.hadoop.hive.metastore.api.Table newTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);

Map<String, String> newTableParameters = newTable.getParameters();
Assert.assertNull(newTableParameters.get(PREVIOUS_METADATA_LOCATION_PROP));
Assert.assertEquals(originalParams.get(TABLE_TYPE_PROP), newTableParameters.get(TABLE_TYPE_PROP));
Assert.assertEquals(originalParams.get(METADATA_LOCATION_PROP), newTableParameters.get(METADATA_LOCATION_PROP));
Assert.assertEquals(originalTable.getSd(), newTable.getSd());
}

@Test
public void testRegisterExistingTable() throws TException {
org.apache.hadoop.hive.metastore.api.Table originalTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);

Map<String, String> originalParams = originalTable.getParameters();
Assert.assertNotNull(originalParams);
Assert.assertTrue(ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(originalParams.get(TABLE_TYPE_PROP)));
Assert.assertTrue("EXTERNAL_TABLE".equalsIgnoreCase(originalTable.getTableType()));

List<String> metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
Assert.assertEquals(1, metadataVersionFiles.size());

// Try to register an existing table
AssertHelpers.assertThrows(
"Should complain that the table already exists", AlreadyExistsException.class,
"Table already exists",
() -> catalog.registerTable(TABLE_IDENTIFIER, "file:" + metadataVersionFiles.get(0)));
}

@Test
public void testEngineHiveEnabledDefault() throws TException {
// Drop the previously created table to make place for the new one
Expand Down