Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public HiveCatalog(Configuration conf) {
this.closed = false;
}

protected HiveClientPool clientPool() {
return clients;
}

protected Configuration conf() {
return conf;
}

@Override
public List<TableIdentifier> listTables(Namespace namespace) {
Preconditions.checkArgument(namespace.levels().length == 1,
Expand Down
19 changes: 14 additions & 5 deletions hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ public final class HiveCatalogs {
.removalListener((RemovalListener<String, HiveCatalog>) (uri, catalog, cause) -> catalog.close())
.build();

private static final Cache<String, CustomHiveCatalog> CUSTOM_CATALOG_CACHE = Caffeine.newBuilder()
.expireAfterAccess(10, TimeUnit.MINUTES)
.removalListener((RemovalListener<String, HiveCatalog>) (uri, catalog, cause) -> catalog.close())
.build();
private static final Cache<String, HiveMetadataPreservingCatalog> HIVE_METADATA_PRESERVING_CATALOG_CACHE =
Caffeine.newBuilder()
.expireAfterAccess(10, TimeUnit.MINUTES)
.removalListener((RemovalListener<String, HiveCatalog>) (uri, catalog, cause) -> catalog.close())
.build();

private HiveCatalogs() {}

Expand All @@ -46,9 +47,17 @@ public static HiveCatalog loadCatalog(Configuration conf) {
return CATALOG_CACHE.get(metastoreUri, uri -> new HiveCatalog(conf));
}

/**
* @deprecated Use {@link #loadHiveMetadataPreservingCatalog(Configuration)} instead
*/
@Deprecated
public static HiveCatalog loadCustomCatalog(Configuration conf) {
return loadHiveMetadataPreservingCatalog(conf);
}

public static HiveCatalog loadHiveMetadataPreservingCatalog(Configuration conf) {
// metastore URI can be null in local mode
String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
return CUSTOM_CATALOG_CACHE.get(metastoreUri, uri -> new CustomHiveCatalog(conf));
return HIVE_METADATA_PRESERVING_CATALOG_CACHE.get(metastoreUri, uri -> new HiveMetadataPreservingCatalog(conf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,21 @@


/**
* A {@link HiveCatalog} which uses {@link CustomHiveTableOperations} underneath.
* A {@link HiveCatalog} which uses {@link HiveMetadataPreservingTableOperations} underneath.
*
* This catalog is especially useful if the Iceberg operations are being performed in response to Hive operations
* This catalog should be only be used by metadata publishers wanting to publish/update Iceberg metadata to an existing
* Hive table while preserving the current Hive metadata
*/
public class CustomHiveCatalog extends HiveCatalog {
public class HiveMetadataPreservingCatalog extends HiveCatalog {

private final HiveClientPool clients;
private final Configuration conf;

public CustomHiveCatalog(Configuration conf) {
public HiveMetadataPreservingCatalog(Configuration conf) {
super(conf);
this.clients = new HiveClientPool(2, conf);
this.conf = conf;
}

@Override
public TableOperations newTableOps(TableIdentifier tableIdentifier) {
String dbName = tableIdentifier.namespace().level(0);
String tableName = tableIdentifier.name();
return new CustomHiveTableOperations(conf, clients, dbName, tableName);
return new HiveMetadataPreservingTableOperations(conf(), clientPool(), dbName, tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,20 @@

package org.apache.iceberg.hive;

import com.google.common.collect.Lists;
import java.net.InetAddress;
import com.google.common.collect.ImmutableMap;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockLevel;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.thrift.TException;
Expand All @@ -61,12 +49,12 @@
* This behaviour is useful if the Iceberg metadata is being generated/updated in response to Hive metadata being
* updated.
*/
public class CustomHiveTableOperations extends HiveTableOperations {
public class HiveMetadataPreservingTableOperations extends HiveTableOperations {
private final HiveClientPool metaClients;
private final String database;
private final String tableName;

protected CustomHiveTableOperations(Configuration conf, HiveClientPool metaClients, String database,
protected HiveMetadataPreservingTableOperations(Configuration conf, HiveClientPool metaClients, String database,
String table) {
super(conf, metaClients, database, table);
this.metaClients = metaClients;
Expand All @@ -82,8 +70,13 @@ protected void doRefresh() {
String tableType = table.getParameters().get(TABLE_TYPE_PROP);

if (tableType == null || !tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE)) {
// if table type is not Iceberg, do not throw an error. Instead just continue, current metadata will still be
// null so further operations should work correctly
// [LINKEDIN] If table type is not Iceberg, that means there is no Iceberg metadata for the table yet.
// So do not throw an error, instead just continue, currentMetadata will continue to remain null
// which is what doRefresh would do if the table did not exist and further operations should work correctly

// throw new IllegalArgumentException(String.format("Type of %s.%s is %s, not %s",
// database, tableName,
// tableType /* actual type */, ICEBERG_TABLE_TYPE_VALUE /* expected type */));
} else {
metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
if (metadataLocation == null) {
Expand Down Expand Up @@ -123,6 +116,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
lockId = Optional.of(acquireLock());
// TODO add lock heart beating for cases where default lock timeout is too low.
Table tbl;
// [LINKEDIN] Instead of checking if base != null to check for table existence, we query metastore for existence
// base can be null if not Iceberg metadata exists, but Hive table exists, so we want to get the current table
// definition and not create a new definition
boolean tableExists = metaClients.run(client -> client.tableExists(database, tableName));
if (tableExists) {
tbl = metaClients.run(client -> client.getTable(database, tableName));
Expand All @@ -143,18 +139,24 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
tbl.getParameters().put("EXTERNAL", "TRUE"); // using the external table type also requires this
}

// [LINKEDIN] Do not touch the Hive schema of the table, just modify Iceberg specific properties
// tbl.setSd(storageDescriptor(metadata)); // set to pickup any schema changes
final String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
if (!Objects.equals(currentMetadataLocation(), metadataLocation)) {
String errMsg = String.format("metadataLocation = %s is not same as table metadataLocation %s for %s.%s",
currentMetadataLocation(), metadataLocation, database, tableName);
throw new CommitFailedException(errMsg);
String baseMetadataLocation = base != null ? base.file().location() : null;
if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
throw new CommitFailedException(
"Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s",
baseMetadataLocation, metadataLocation, database, tableName);
}

setParameters(newMetadataLocation, tbl);

if (tableExists) {
metaClients.run(client -> {
client.alter_table(database, tableName, tbl);
EnvironmentContext envContext = new EnvironmentContext(
ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE)
);
client.alter_table(database, tableName, tbl, envContext);
return null;
});
} else {
Expand All @@ -164,6 +166,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
});
}
threw = false;
} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);

} catch (TException | UnknownHostException e) {
if (e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
throw new RuntimeException("Failed to acquire locks from metastore because 'HIVE_LOCKS' doesn't " +
Expand All @@ -175,6 +180,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during commit", e);

} finally {
if (threw) {
// if anything went wrong, clean up the uncommitted metadata file
Expand All @@ -183,76 +189,4 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
unlock(lockId);
}
}

private void setParameters(String newMetadataLocation, Table tbl) {
Map<String, String> parameters = tbl.getParameters();

if (parameters == null) {
parameters = new HashMap<>();
}

parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH));
parameters.put(METADATA_LOCATION_PROP, newMetadataLocation);

if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) {
parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation());
}

tbl.setParameters(parameters);
}

private StorageDescriptor storageDescriptor(TableMetadata metadata) {

final StorageDescriptor storageDescriptor = new StorageDescriptor();
storageDescriptor.setCols(columns(metadata.schema()));
storageDescriptor.setLocation(metadata.location());
storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileOutputFormat");
storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat");
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
storageDescriptor.setSerdeInfo(serDeInfo);
return storageDescriptor;
}

private List<FieldSchema> columns(Schema schema) {
return schema.columns().stream()
.map(col -> new FieldSchema(col.name(), HiveTypeConverter.convert(col.type()), ""))
.collect(Collectors.toList());
}

private long acquireLock() throws UnknownHostException, TException, InterruptedException {
final LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
lockComponent.setTablename(tableName);
final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent), System.getProperty("user.name"),
InetAddress.getLocalHost().getHostName());
LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest));
LockState state = lockResponse.getState();
long lockId = lockResponse.getLockid();
//TODO add timeout
while (state.equals(LockState.WAITING)) {
lockResponse = metaClients.run(client -> client.checkLock(lockId));
state = lockResponse.getState();
Thread.sleep(50);
}

if (!state.equals(LockState.ACQUIRED)) {
throw new CommitFailedException(
String.format("Could not acquire the lock on %s.%s, " + "lock request ended in state %s", database, tableName,
state));
}
return lockId;
}

private void unlock(Optional<Long> lockId) {
if (lockId.isPresent()) {
try {
metaClients.run(client -> {
client.unlock(lockId.get());
return null;
});
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to unlock %s.%s", database, tableName), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
}
}

private void setParameters(String newMetadataLocation, Table tbl) {
void setParameters(String newMetadataLocation, Table tbl) {
Map<String, String> parameters = tbl.getParameters();

if (parameters == null) {
Expand All @@ -227,7 +227,7 @@ private void setParameters(String newMetadataLocation, Table tbl) {
tbl.setParameters(parameters);
}

private StorageDescriptor storageDescriptor(TableMetadata metadata) {
StorageDescriptor storageDescriptor(TableMetadata metadata) {

final StorageDescriptor storageDescriptor = new StorageDescriptor();
storageDescriptor.setCols(columns(metadata.schema()));
Expand All @@ -246,7 +246,7 @@ private List<FieldSchema> columns(Schema schema) {
.collect(Collectors.toList());
}

private long acquireLock() throws UnknownHostException, TException, InterruptedException {
long acquireLock() throws UnknownHostException, TException, InterruptedException {
final LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
lockComponent.setTablename(tableName);
final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent),
Expand Down Expand Up @@ -285,7 +285,7 @@ private long acquireLock() throws UnknownHostException, TException, InterruptedE
return lockId;
}

private void unlock(Optional<Long> lockId) {
void unlock(Optional<Long> lockId) {
if (lockId.isPresent()) {
try {
metaClients.run(client -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import static org.apache.iceberg.types.Types.NestedField.required;


public class TestCustomHiveCatalog extends HiveMetastoreTest {
public class TestHiveMetadataPreservingCatalog extends HiveMetastoreTest {

private static final String TABLE_NAME = "tbl";
private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(DB_NAME, TABLE_NAME);
Expand All @@ -60,7 +60,7 @@ public class TestCustomHiveCatalog extends HiveMetastoreTest {

@BeforeClass
public static void setCustomHiveCatalog() {
catalog = new CustomHiveCatalog(hiveConf);
catalog = new HiveMetadataPreservingCatalog(hiveConf);
}

@Before
Expand Down