Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
baseMetadataLocation, metadataLocation, database, tableName);
}

setParameters(newMetadataLocation, tbl, hiveEngineEnabled);
setHmsTableParameters(newMetadataLocation, tbl, metadata.properties(), hiveEngineEnabled);

persistTable(tbl, updateHiveTable);
threw = false;
Expand Down Expand Up @@ -257,13 +257,17 @@ private Table newHmsTable() {
return newTable;
}

private void setParameters(String newMetadataLocation, Table tbl, boolean hiveEngineEnabled) {
private void setHmsTableParameters(String newMetadataLocation, Table tbl, Map<String, String> icebergTableProps,
boolean hiveEngineEnabled) {
Map<String, String> parameters = tbl.getParameters();

if (parameters == null) {
parameters = new HashMap<>();
}

// push all Iceberg table properties into HMS
icebergTableProps.forEach(parameters::put);

parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH));
parameters.put(METADATA_LOCATION_PROP, newMetadataLocation);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.hadoop.Util;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
Expand Down Expand Up @@ -154,10 +156,6 @@ public HiveConf hiveConf() {
return hiveConf;
}

public HiveClientPool clientPool() {
return clientPool;
}

public String getDatabasePath(String dbName) {
File dbDir = new File(hiveLocalDir, dbName + ".db");
return dbDir.getPath();
Expand Down Expand Up @@ -191,6 +189,10 @@ public void reset() throws Exception {
}
}

public Table getTable(String dbName, String tableName) throws TException, InterruptedException {
return clientPool.run(client -> client.getTable(dbName, tableName));
}

private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf conf) throws Exception {
HiveConf serverConf = new HiveConf(conf);
serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:" + getDerbyPath() + ";create=true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -51,10 +50,15 @@
public class HiveIcebergMetaHook implements HiveMetaHook {
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class);
private static final Set<String> PARAMETERS_TO_REMOVE = ImmutableSet
.of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, Catalogs.LOCATION, Catalogs.NAME);
.of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME);
private static final Set<String> PROPERTIES_TO_REMOVE = ImmutableSet
.of(InputFormatConfig.EXTERNAL_TABLE_PURGE, hive_metastoreConstants.META_TABLE_STORAGE, "EXTERNAL",
"bucketing_version");
// We don't want to push down the metadata location props to Iceberg from HMS,
// since the snapshot pointer in HMS would always be one step ahead
.of(BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP,
// Initially we'd like to cache the partition spec in HMS, but not push it down later to Iceberg during alter
// table commands since by then the HMS info can be stale + Iceberg does not store its partition spec in the props
InputFormatConfig.PARTITION_SPEC);

private final Configuration conf;
private Table icebergTable = null;
Expand Down Expand Up @@ -179,10 +183,10 @@ public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable,
/**
* Calculates the properties we would like to send to the catalog.
* <ul>
* <li>The base of the properties is the properties store at the Hive Metastore for the given table
* <li>The base of the properties is the properties stored at the Hive Metastore for the given table
* <li>We add the {@link Catalogs#LOCATION} as the table location
* <li>We add the {@link Catalogs#NAME} as TableIdentifier defined by the database name and table name
* <li>We remove the Hive Metastore specific parameters
* <li>We remove some parameters that we don't want to push down to the Iceberg table props
* </ul>
* @param hmsTable Table for which we are calculating the properties
* @return The properties we can provide for Iceberg functions, like {@link Catalogs}
Expand All @@ -200,7 +204,7 @@ private static Properties getCatalogProperties(org.apache.hadoop.hive.metastore.
properties.put(Catalogs.NAME, TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()).toString());
}

// Remove creation related properties
// Remove HMS table parameters we don't want to propagate to Iceberg
PROPERTIES_TO_REMOVE.forEach(properties::remove);

return properties;
Expand All @@ -224,11 +228,11 @@ private Schema schema(Properties properties, org.apache.hadoop.hive.metastore.ap
private static PartitionSpec spec(Schema schema, Properties properties,
org.apache.hadoop.hive.metastore.api.Table hmsTable) {

if (properties.getProperty(InputFormatConfig.PARTITION_SPEC) != null) {
if (hmsTable.getParameters().get(InputFormatConfig.PARTITION_SPEC) != null) {
Preconditions.checkArgument(!hmsTable.isSetPartitionKeys() || hmsTable.getPartitionKeys().isEmpty(),
"Provide only one of the following: Hive partition specification, or the " +
InputFormatConfig.PARTITION_SPEC + " property");
return PartitionSpecParser.fromJson(schema, properties.getProperty(InputFormatConfig.PARTITION_SPEC));
return PartitionSpecParser.fromJson(schema, hmsTable.getParameters().get(InputFormatConfig.PARTITION_SPEC));
} else if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) {
// If the table is partitioned then generate the identity partition definitions for the Iceberg table
return HiveSchemaUtil.spec(schema, hmsTable.getPartitionKeys());
Expand Down
Loading