diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 05ca375b3f80..f03fa0b1014e 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -186,7 +186,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { baseMetadataLocation, metadataLocation, database, tableName); } - setParameters(newMetadataLocation, tbl, hiveEngineEnabled); + setHmsTableParameters(newMetadataLocation, tbl, metadata.properties(), hiveEngineEnabled); persistTable(tbl, updateHiveTable); threw = false; @@ -257,13 +257,17 @@ private Table newHmsTable() { return newTable; } - private void setParameters(String newMetadataLocation, Table tbl, boolean hiveEngineEnabled) { + private void setHmsTableParameters(String newMetadataLocation, Table tbl, Map icebergTableProps, + boolean hiveEngineEnabled) { Map parameters = tbl.getParameters(); if (parameters == null) { parameters = new HashMap<>(); } + // push all Iceberg table properties into HMS + icebergTableProps.forEach(parameters::put); + parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); parameters.put(METADATA_LOCATION_PROP, newMetadataLocation); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java index 1381b5495ae2..c57b80f7960c 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java @@ -38,9 +38,11 @@ import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.RetryingHMSHandler; import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.hadoop.Util; +import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; @@ -154,10 +156,6 @@ public HiveConf hiveConf() { return hiveConf; } - public HiveClientPool clientPool() { - return clientPool; - } - public String getDatabasePath(String dbName) { File dbDir = new File(hiveLocalDir, dbName + ".db"); return dbDir.getPath(); @@ -191,6 +189,10 @@ public void reset() throws Exception { } } + public Table getTable(String dbName, String tableName) throws TException, InterruptedException { + return clientPool.run(client -> client.getTable(dbName, tableName)); + } + private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf conf) throws Exception { HiveConf serverConf = new HiveConf(conf); serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:" + getDerbyPath() + ";create=true"); diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 586f6fc9420c..54c2247fc027 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.PartitionSpec; @@ -51,10 +50,15 @@ public class HiveIcebergMetaHook implements HiveMetaHook { private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class); private static final Set PARAMETERS_TO_REMOVE = ImmutableSet - .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, Catalogs.LOCATION, Catalogs.NAME); + .of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME); private static final Set PROPERTIES_TO_REMOVE = ImmutableSet - .of(InputFormatConfig.EXTERNAL_TABLE_PURGE, hive_metastoreConstants.META_TABLE_STORAGE, "EXTERNAL", - "bucketing_version"); + // We don't want to push down the metadata location props to Iceberg from HMS, + // since the snapshot pointer in HMS would always be one step ahead + .of(BaseMetastoreTableOperations.METADATA_LOCATION_PROP, + BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP, + // Initially we'd like to cache the partition spec in HMS, but not push it down later to Iceberg during alter + // table commands since by then the HMS info can be stale + Iceberg does not store its partition spec in the props + InputFormatConfig.PARTITION_SPEC); private final Configuration conf; private Table icebergTable = null; @@ -179,10 +183,10 @@ public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, /** * Calculates the properties we would like to send to the catalog. *
    - *
  • The base of the properties is the properties store at the Hive Metastore for the given table + *
  • The base of the properties is the properties stored at the Hive Metastore for the given table *
  • We add the {@link Catalogs#LOCATION} as the table location *
  • We add the {@link Catalogs#NAME} as TableIdentifier defined by the database name and table name - *
  • We remove the Hive Metastore specific parameters + *
  • We remove some parameters that we don't want to push down to the Iceberg table props *
* @param hmsTable Table for which we are calculating the properties * @return The properties we can provide for Iceberg functions, like {@link Catalogs} @@ -200,7 +204,7 @@ private static Properties getCatalogProperties(org.apache.hadoop.hive.metastore. properties.put(Catalogs.NAME, TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()).toString()); } - // Remove creation related properties + // Remove HMS table parameters we don't want to propagate to Iceberg PROPERTIES_TO_REMOVE.forEach(properties::remove); return properties; @@ -224,11 +228,11 @@ private Schema schema(Properties properties, org.apache.hadoop.hive.metastore.ap private static PartitionSpec spec(Schema schema, Properties properties, org.apache.hadoop.hive.metastore.api.Table hmsTable) { - if (properties.getProperty(InputFormatConfig.PARTITION_SPEC) != null) { + if (hmsTable.getParameters().get(InputFormatConfig.PARTITION_SPEC) != null) { Preconditions.checkArgument(!hmsTable.isSetPartitionKeys() || hmsTable.getPartitionKeys().isEmpty(), "Provide only one of the following: Hive partition specification, or the " + InputFormatConfig.PARTITION_SPEC + " property"); - return PartitionSpecParser.fromJson(schema, properties.getProperty(InputFormatConfig.PARTITION_SPEC)); + return PartitionSpecParser.fromJson(schema, hmsTable.getParameters().get(InputFormatConfig.PARTITION_SPEC)); } else if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) { // If the table is partitioned then generate the identity partition definitions for the Iceberg table return HiveSchemaUtil.spec(schema, hmsTable.getPartitionKeys()); diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index 78655dcc6a84..7311bb828263 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -27,12 +27,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; @@ -43,6 +45,7 @@ import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.hive.HiveSchemaUtil; +import org.apache.iceberg.hive.MetastoreUtil; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -165,39 +168,7 @@ public void testCreateDropTable() throws TException, IOException, InterruptedExc icebergTable.schema().asStruct()); Assert.assertEquals(PartitionSpec.unpartitioned(), icebergTable.spec()); - // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - - Map hmsParams = hmsTable.getParameters(); - IGNORED_PARAMS.forEach(hmsParams::remove); - - // This is only set for HiveCatalog based tables. Check the value, then remove it so the other checks can be general - if (Catalogs.hiveCatalog(shell.getHiveConf())) { - Assert.assertTrue(hmsParams.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) - .startsWith(icebergTable.location())); - hmsParams.remove(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); - } - - // General metadata checks - Assert.assertEquals(6, hmsParams.size()); - Assert.assertEquals("test", hmsParams.get("dummy")); - Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE)); - Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); - Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME)); - Assert.assertEquals(HiveIcebergStorageHandler.class.getName(), - hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE)); - Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(), - hmsTable.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); - - // verify that storage descriptor is filled out with inputformat/outputformat/serde - Assert.assertEquals(HiveIcebergInputFormat.class.getName(), hmsTable.getSd().getInputFormat()); - Assert.assertEquals(HiveIcebergOutputFormat.class.getName(), hmsTable.getSd().getOutputFormat()); - Assert.assertEquals(HiveIcebergSerDe.class.getName(), hmsTable.getSd().getSerdeInfo().getSerializationLib()); - if (!Catalogs.hiveCatalog(shell.getHiveConf())) { - Assert.assertEquals(Collections.singletonMap("dummy", "test"), icebergTable.properties()); - shell.executeStatement("DROP TABLE customers"); // Check if the table was really dropped even from the Catalog @@ -207,13 +178,7 @@ public void testCreateDropTable() throws TException, IOException, InterruptedExc } ); } else { - Map expectedIcebergProperties = new HashMap<>(2); - expectedIcebergProperties.put("dummy", "test"); - expectedIcebergProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true"); - Assert.assertEquals(expectedIcebergProperties, icebergTable.properties()); - - // Check the HMS table parameters - hmsTable = shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); + org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers"); Path hmsTableLocation = new Path(hmsTable.getSd().getLocation()); // Drop the table @@ -238,7 +203,7 @@ public void testCreateDropTable() throws TException, IOException, InterruptedExc } @Test - public void testCreateTableWithoutSpec() throws TException, InterruptedException { + public void testCreateTableWithoutSpec() { TableIdentifier identifier = TableIdentifier.of("default", "customers"); shell.executeStatement("CREATE EXTERNAL TABLE customers " + @@ -250,26 +215,10 @@ public void testCreateTableWithoutSpec() throws TException, InterruptedException // Check the Iceberg table partition data org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); Assert.assertEquals(PartitionSpec.unpartitioned(), icebergTable.spec()); - - // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - - Map hmsParams = hmsTable.getParameters(); - IGNORED_PARAMS.forEach(hmsParams::remove); - - // Just check that the PartitionSpec is not set in the metadata - Assert.assertNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC)); - - if (Catalogs.hiveCatalog(shell.getHiveConf())) { - Assert.assertEquals(6, hmsParams.size()); - } else { - Assert.assertEquals(5, hmsParams.size()); - } } @Test - public void testCreateTableWithUnpartitionedSpec() throws TException, InterruptedException { + public void testCreateTableWithUnpartitionedSpec() { TableIdentifier identifier = TableIdentifier.of("default", "customers"); // We need the location for HadoopTable based tests only @@ -284,21 +233,6 @@ public void testCreateTableWithUnpartitionedSpec() throws TException, Interrupte // Check the Iceberg table partition data org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); Assert.assertEquals(SPEC, icebergTable.spec()); - - // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - - Map hmsParams = hmsTable.getParameters(); - IGNORED_PARAMS.forEach(hmsParams::remove); - - // Just check that the PartitionSpec is not set in the metadata - Assert.assertNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC)); - if (Catalogs.hiveCatalog(shell.getHiveConf())) { - Assert.assertEquals(6, hmsParams.size()); - } else { - Assert.assertEquals(5, hmsParams.size()); - } } @Test @@ -319,8 +253,7 @@ public void testDeleteBackingTable() throws TException, IOException, Interrupted testTables.loadTable(identifier); } else { // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); + org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers"); Path hmsTableLocation = new Path(hmsTable.getSd().getLocation()); // Drop the table @@ -377,13 +310,12 @@ public void testCreateTableError() { } @Test - public void testCreateTableAboveExistingTable() throws TException, IOException, InterruptedException { + public void testCreateTableAboveExistingTable() throws IOException { // Create the Iceberg table testTables.createIcebergTable(shell.getHiveConf(), "customers", COMPLEX_SCHEMA, FileFormat.PARQUET, Collections.emptyList()); if (Catalogs.hiveCatalog(shell.getHiveConf())) { - // In HiveCatalog we just expect an exception since the table is already exists AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, "customers already exists", () -> { @@ -394,24 +326,10 @@ public void testCreateTableAboveExistingTable() throws TException, IOException, } ); } else { + // With other catalogs, table creation should succeed shell.executeStatement("CREATE EXTERNAL TABLE customers " + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers"))); - - // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - - Map hmsParams = hmsTable.getParameters(); - IGNORED_PARAMS.forEach(hmsParams::remove); - - Assert.assertEquals(4, hmsParams.size()); - Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); - Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME)); - Assert.assertEquals(HiveIcebergStorageHandler.class.getName(), - hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE)); - Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(), - hmsTable.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); } } @@ -562,4 +480,95 @@ public void testCreateTableWithoutColumnComments() { "from deserializer"}, rows.get(i)); } } + + @Test + public void testIcebergAndHmsTableProperties() throws TException, InterruptedException { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + + shell.executeStatement(String.format("CREATE EXTERNAL TABLE default.customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' %s" + + "TBLPROPERTIES ('%s'='%s', '%s'='%s', '%s'='%s')", + testTables.locationForCreateTableSQL(identifier), // we need the location for HadoopTable based tests only + InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA), + InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(SPEC), + "custom_property", "initial_val")); + + + // Check the Iceberg table parameters + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); + + Map expectedIcebergProperties = new HashMap<>(); + expectedIcebergProperties.put("custom_property", "initial_val"); + expectedIcebergProperties.put("EXTERNAL", "TRUE"); + expectedIcebergProperties.put("storage_handler", HiveIcebergStorageHandler.class.getName()); + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + expectedIcebergProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true"); + } + if (MetastoreUtil.hive3PresentOnClasspath()) { + expectedIcebergProperties.put("bucketing_version", "2"); + } + Assert.assertEquals(expectedIcebergProperties, icebergTable.properties()); + + // Check the HMS table parameters + org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers"); + Map hmsParams = hmsTable.getParameters() + .entrySet().stream() + .filter(e -> !IGNORED_PARAMS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + Assert.assertEquals(9, hmsParams.size()); + Assert.assertEquals("initial_val", hmsParams.get("custom_property")); + Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE)); + Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); + Assert.assertEquals("true", hmsParams.get(TableProperties.ENGINE_HIVE_ENABLED)); + Assert.assertEquals(HiveIcebergStorageHandler.class.getName(), + hmsParams.get(hive_metastoreConstants.META_TABLE_STORAGE)); + Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(), + hmsParams.get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); + Assert.assertEquals(hmsParams.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP), + getCurrentSnapshotForHiveCatalogTable(icebergTable)); + Assert.assertNull(hmsParams.get(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP)); + Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME)); + } else { + Assert.assertEquals(7, hmsParams.size()); + Assert.assertNull(hmsParams.get(TableProperties.ENGINE_HIVE_ENABLED)); + } + + // Check HMS inputformat/outputformat/serde + Assert.assertEquals(HiveIcebergInputFormat.class.getName(), hmsTable.getSd().getInputFormat()); + Assert.assertEquals(HiveIcebergOutputFormat.class.getName(), hmsTable.getSd().getOutputFormat()); + Assert.assertEquals(HiveIcebergSerDe.class.getName(), hmsTable.getSd().getSerdeInfo().getSerializationLib()); + + // Add two new properties to the Iceberg table and update an existing one + icebergTable.updateProperties() + .set("new_prop_1", "true") + .set("new_prop_2", "false") + .set("custom_property", "new_val") + .commit(); + + // Refresh the HMS table to see if new Iceberg properties got synced into HMS + hmsParams = shell.metastore().getTable("default", "customers").getParameters() + .entrySet().stream() + .filter(e -> !IGNORED_PARAMS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + Assert.assertEquals(12, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop + Assert.assertEquals("true", hmsParams.get("new_prop_1")); + Assert.assertEquals("false", hmsParams.get("new_prop_2")); + Assert.assertEquals("new_val", hmsParams.get("custom_property")); + String prevSnapshot = getCurrentSnapshotForHiveCatalogTable(icebergTable); + icebergTable.refresh(); + String newSnapshot = getCurrentSnapshotForHiveCatalogTable(icebergTable); + Assert.assertEquals(hmsParams.get(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP), prevSnapshot); + Assert.assertEquals(hmsParams.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP), newSnapshot); + } else { + Assert.assertEquals(7, hmsParams.size()); + } + } + + private String getCurrentSnapshotForHiveCatalogTable(org.apache.iceberg.Table icebergTable) { + return ((BaseMetastoreTableOperations) ((BaseTable) icebergTable).operations()).currentMetadataLocation(); + } }