diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index d00a1519c2..d894ae0b4c 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -56,6 +56,14 @@ public HiveCatalog(Configuration conf) { this.closed = false; } + protected HiveClientPool clientPool() { + return clients; + } + + protected Configuration conf() { + return conf; + } + @Override public List listTables(Namespace namespace) { Preconditions.checkArgument(namespace.levels().length == 1, diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java index a4ba98e1f9..d282c99012 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java @@ -33,10 +33,11 @@ public final class HiveCatalogs { .removalListener((RemovalListener) (uri, catalog, cause) -> catalog.close()) .build(); - private static final Cache CUSTOM_CATALOG_CACHE = Caffeine.newBuilder() - .expireAfterAccess(10, TimeUnit.MINUTES) - .removalListener((RemovalListener) (uri, catalog, cause) -> catalog.close()) - .build(); + private static final Cache HIVE_METADATA_PRESERVING_CATALOG_CACHE = + Caffeine.newBuilder() + .expireAfterAccess(10, TimeUnit.MINUTES) + .removalListener((RemovalListener) (uri, catalog, cause) -> catalog.close()) + .build(); private HiveCatalogs() {} @@ -46,9 +47,17 @@ public static HiveCatalog loadCatalog(Configuration conf) { return CATALOG_CACHE.get(metastoreUri, uri -> new HiveCatalog(conf)); } + /** + * @deprecated Use {@link #loadHiveMetadataPreservingCatalog(Configuration)} instead + */ + @Deprecated public static HiveCatalog loadCustomCatalog(Configuration conf) { + return loadHiveMetadataPreservingCatalog(conf); + } + + public static HiveCatalog loadHiveMetadataPreservingCatalog(Configuration conf) { // metastore URI can be null in local mode String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, ""); - return CUSTOM_CATALOG_CACHE.get(metastoreUri, uri -> new CustomHiveCatalog(conf)); + return HIVE_METADATA_PRESERVING_CATALOG_CACHE.get(metastoreUri, uri -> new HiveMetadataPreservingCatalog(conf)); } } diff --git a/hive/src/main/java/org/apache/iceberg/hive/CustomHiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingCatalog.java similarity index 70% rename from hive/src/main/java/org/apache/iceberg/hive/CustomHiveCatalog.java rename to hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingCatalog.java index b16d577cb4..d15b3bd271 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/CustomHiveCatalog.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingCatalog.java @@ -25,25 +25,21 @@ /** - * A {@link HiveCatalog} which uses {@link CustomHiveTableOperations} underneath. + * A {@link HiveCatalog} which uses {@link HiveMetadataPreservingTableOperations} underneath. * - * This catalog is especially useful if the Iceberg operations are being performed in response to Hive operations + * This catalog should be only be used by metadata publishers wanting to publish/update Iceberg metadata to an existing + * Hive table while preserving the current Hive metadata */ -public class CustomHiveCatalog extends HiveCatalog { +public class HiveMetadataPreservingCatalog extends HiveCatalog { - private final HiveClientPool clients; - private final Configuration conf; - - public CustomHiveCatalog(Configuration conf) { + public HiveMetadataPreservingCatalog(Configuration conf) { super(conf); - this.clients = new HiveClientPool(2, conf); - this.conf = conf; } @Override public TableOperations newTableOps(TableIdentifier tableIdentifier) { String dbName = tableIdentifier.namespace().level(0); String tableName = tableIdentifier.name(); - return new CustomHiveTableOperations(conf, clients, dbName, tableName); + return new HiveMetadataPreservingTableOperations(conf(), clientPool(), dbName, tableName); } } diff --git a/hive/src/main/java/org/apache/iceberg/hive/CustomHiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java similarity index 59% rename from hive/src/main/java/org/apache/iceberg/hive/CustomHiveTableOperations.java rename to hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java index 9904ba938b..d6d9f79ead 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/CustomHiveTableOperations.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java @@ -19,32 +19,20 @@ package org.apache.iceberg.hive; -import com.google.common.collect.Lists; -import java.net.InetAddress; +import com.google.common.collect.ImmutableMap; import java.net.UnknownHostException; import java.util.Collections; import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.LockComponent; -import org.apache.hadoop.hive.metastore.api.LockLevel; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.LockState; -import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.iceberg.Schema; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.thrift.TException; @@ -61,12 +49,12 @@ * This behaviour is useful if the Iceberg metadata is being generated/updated in response to Hive metadata being * updated. */ -public class CustomHiveTableOperations extends HiveTableOperations { +public class HiveMetadataPreservingTableOperations extends HiveTableOperations { private final HiveClientPool metaClients; private final String database; private final String tableName; - protected CustomHiveTableOperations(Configuration conf, HiveClientPool metaClients, String database, + protected HiveMetadataPreservingTableOperations(Configuration conf, HiveClientPool metaClients, String database, String table) { super(conf, metaClients, database, table); this.metaClients = metaClients; @@ -82,8 +70,13 @@ protected void doRefresh() { String tableType = table.getParameters().get(TABLE_TYPE_PROP); if (tableType == null || !tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE)) { - // if table type is not Iceberg, do not throw an error. Instead just continue, current metadata will still be - // null so further operations should work correctly + // [LINKEDIN] If table type is not Iceberg, that means there is no Iceberg metadata for the table yet. + // So do not throw an error, instead just continue, currentMetadata will continue to remain null + // which is what doRefresh would do if the table did not exist and further operations should work correctly + + // throw new IllegalArgumentException(String.format("Type of %s.%s is %s, not %s", + // database, tableName, + // tableType /* actual type */, ICEBERG_TABLE_TYPE_VALUE /* expected type */)); } else { metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP); if (metadataLocation == null) { @@ -123,6 +116,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { lockId = Optional.of(acquireLock()); // TODO add lock heart beating for cases where default lock timeout is too low. Table tbl; + // [LINKEDIN] Instead of checking if base != null to check for table existence, we query metastore for existence + // base can be null if not Iceberg metadata exists, but Hive table exists, so we want to get the current table + // definition and not create a new definition boolean tableExists = metaClients.run(client -> client.tableExists(database, tableName)); if (tableExists) { tbl = metaClients.run(client -> client.getTable(database, tableName)); @@ -143,18 +139,24 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { tbl.getParameters().put("EXTERNAL", "TRUE"); // using the external table type also requires this } + // [LINKEDIN] Do not touch the Hive schema of the table, just modify Iceberg specific properties + // tbl.setSd(storageDescriptor(metadata)); // set to pickup any schema changes final String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP); - if (!Objects.equals(currentMetadataLocation(), metadataLocation)) { - String errMsg = String.format("metadataLocation = %s is not same as table metadataLocation %s for %s.%s", - currentMetadataLocation(), metadataLocation, database, tableName); - throw new CommitFailedException(errMsg); + String baseMetadataLocation = base != null ? base.file().location() : null; + if (!Objects.equals(baseMetadataLocation, metadataLocation)) { + throw new CommitFailedException( + "Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s", + baseMetadataLocation, metadataLocation, database, tableName); } setParameters(newMetadataLocation, tbl); if (tableExists) { metaClients.run(client -> { - client.alter_table(database, tableName, tbl); + EnvironmentContext envContext = new EnvironmentContext( + ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE) + ); + client.alter_table(database, tableName, tbl, envContext); return null; }); } else { @@ -164,6 +166,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { }); } threw = false; + } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { + throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName); + } catch (TException | UnknownHostException e) { if (e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) { throw new RuntimeException("Failed to acquire locks from metastore because 'HIVE_LOCKS' doesn't " + @@ -175,6 +180,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Interrupted during commit", e); + } finally { if (threw) { // if anything went wrong, clean up the uncommitted metadata file @@ -183,76 +189,4 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { unlock(lockId); } } - - private void setParameters(String newMetadataLocation, Table tbl) { - Map parameters = tbl.getParameters(); - - if (parameters == null) { - parameters = new HashMap<>(); - } - - parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); - parameters.put(METADATA_LOCATION_PROP, newMetadataLocation); - - if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) { - parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation()); - } - - tbl.setParameters(parameters); - } - - private StorageDescriptor storageDescriptor(TableMetadata metadata) { - - final StorageDescriptor storageDescriptor = new StorageDescriptor(); - storageDescriptor.setCols(columns(metadata.schema())); - storageDescriptor.setLocation(metadata.location()); - storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileOutputFormat"); - storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat"); - SerDeInfo serDeInfo = new SerDeInfo(); - serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"); - storageDescriptor.setSerdeInfo(serDeInfo); - return storageDescriptor; - } - - private List columns(Schema schema) { - return schema.columns().stream() - .map(col -> new FieldSchema(col.name(), HiveTypeConverter.convert(col.type()), "")) - .collect(Collectors.toList()); - } - - private long acquireLock() throws UnknownHostException, TException, InterruptedException { - final LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database); - lockComponent.setTablename(tableName); - final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent), System.getProperty("user.name"), - InetAddress.getLocalHost().getHostName()); - LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest)); - LockState state = lockResponse.getState(); - long lockId = lockResponse.getLockid(); - //TODO add timeout - while (state.equals(LockState.WAITING)) { - lockResponse = metaClients.run(client -> client.checkLock(lockId)); - state = lockResponse.getState(); - Thread.sleep(50); - } - - if (!state.equals(LockState.ACQUIRED)) { - throw new CommitFailedException( - String.format("Could not acquire the lock on %s.%s, " + "lock request ended in state %s", database, tableName, - state)); - } - return lockId; - } - - private void unlock(Optional lockId) { - if (lockId.isPresent()) { - try { - metaClients.run(client -> { - client.unlock(lockId.get()); - return null; - }); - } catch (Exception e) { - throw new RuntimeException(String.format("Failed to unlock %s.%s", database, tableName), e); - } - } - } } diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 8685783423..d25fcf9ae6 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -210,7 +210,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { } } - private void setParameters(String newMetadataLocation, Table tbl) { + void setParameters(String newMetadataLocation, Table tbl) { Map parameters = tbl.getParameters(); if (parameters == null) { @@ -227,7 +227,7 @@ private void setParameters(String newMetadataLocation, Table tbl) { tbl.setParameters(parameters); } - private StorageDescriptor storageDescriptor(TableMetadata metadata) { + StorageDescriptor storageDescriptor(TableMetadata metadata) { final StorageDescriptor storageDescriptor = new StorageDescriptor(); storageDescriptor.setCols(columns(metadata.schema())); @@ -246,7 +246,7 @@ private List columns(Schema schema) { .collect(Collectors.toList()); } - private long acquireLock() throws UnknownHostException, TException, InterruptedException { + long acquireLock() throws UnknownHostException, TException, InterruptedException { final LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database); lockComponent.setTablename(tableName); final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent), @@ -285,7 +285,7 @@ private long acquireLock() throws UnknownHostException, TException, InterruptedE return lockId; } - private void unlock(Optional lockId) { + void unlock(Optional lockId) { if (lockId.isPresent()) { try { metaClients.run(client -> { diff --git a/hive/src/test/java/org/apache/iceberg/hive/TestCustomHiveCatalog.java b/hive/src/test/java/org/apache/iceberg/hive/TestHiveMetadataPreservingCatalog.java similarity index 97% rename from hive/src/test/java/org/apache/iceberg/hive/TestCustomHiveCatalog.java rename to hive/src/test/java/org/apache/iceberg/hive/TestHiveMetadataPreservingCatalog.java index 5377ce80c2..788d481f9b 100644 --- a/hive/src/test/java/org/apache/iceberg/hive/TestCustomHiveCatalog.java +++ b/hive/src/test/java/org/apache/iceberg/hive/TestHiveMetadataPreservingCatalog.java @@ -44,7 +44,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; -public class TestCustomHiveCatalog extends HiveMetastoreTest { +public class TestHiveMetadataPreservingCatalog extends HiveMetastoreTest { private static final String TABLE_NAME = "tbl"; private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(DB_NAME, TABLE_NAME); @@ -60,7 +60,7 @@ public class TestCustomHiveCatalog extends HiveMetastoreTest { @BeforeClass public static void setCustomHiveCatalog() { - catalog = new CustomHiveCatalog(hiveConf); + catalog = new HiveMetadataPreservingCatalog(hiveConf); } @Before