From b70a1e6920330fb5a70a58810a150dd7d1e505c2 Mon Sep 17 00:00:00 2001 From: Zhike Chen Date: Mon, 27 Jun 2022 15:24:16 +0800 Subject: [PATCH 1/2] add updateTableSerDeInfo for HiveSyncTool --- .../org/apache/hudi/hive/HiveSyncTool.java | 3 ++ .../hudi/hive/HoodieHiveSyncClient.java | 37 +++++++++++++++++++ .../sync/common/HoodieMetaSyncOperations.java | 6 +++ 3 files changed, 46 insertions(+) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 0374686b7166b..8e0c0d6107e7d 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -290,6 +290,9 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea // Sync the table properties if the schema has changed if (config.getString(HIVE_TABLE_PROPERTIES) != null || config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) { syncClient.updateTableProperties(tableName, tableProperties); + HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase()); + String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat); + syncClient.updateTableSerDeInfo(tableName, serDeFormatClassName, serdeProperties); LOG.info("Sync table properties for " + tableName + ", table properties is: " + tableProperties); } schemaChanged = true; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index d5a85adcbacc2..f12684251597c 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -34,6 +34,8 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.log4j.LogManager; @@ -128,6 +130,41 @@ public void updateTableProperties(String tableName, Map tablePro } } + /** + * Update the table serde properties to the table. + */ + @Override + public void updateTableSerDeInfo(String tableName, String serdeClass, Map serdeProperties) { + if (serdeProperties == null || serdeProperties.isEmpty()) { + return; + } + try { + Table table = client.getTable(databaseName, tableName); + serdeProperties.put("serialization.format", "1"); + StorageDescriptor storageDescriptor = table.getSd(); + SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo(); + if (serdeInfo != null && serdeInfo.getParametersSize() == serdeProperties.size()) { + Map parameters = serdeInfo.getParameters(); + boolean same = true; + for (String key : serdeProperties.keySet()) { + if (!parameters.containsKey(key) | !parameters.get(key).equals(serdeProperties.get(key))) { + same = false; + break; + } + } + if (same) { + LOG.debug("Table " + tableName + " serdeProperties already up to date, skip update"); + return; + } + } + storageDescriptor.setSerdeInfo(new SerDeInfo(null, serdeClass, serdeProperties)); + client.alter_table(databaseName, tableName, table); + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed to update table serde info for table: " + + tableName, e); + } + } + @Override public void updateTableSchema(String tableName, MessageType newSchema) { ddlExecutor.updateTableDefinition(tableName, newSchema); diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java index 1c16dd13edaa4..5afcf80a877e8 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java @@ -173,6 +173,12 @@ default void updateTableProperties(String tableName, Map tablePr } + /** + * Update the table SerDeInfo in metastore. + */ + default void updateTableSerDeInfo(String tableName, String serdeClass, Map serdeProperties) { + } + /** * Get the timestamp of last replication. */ From 2be98deb1d10d00a627fffabeabc318cb045981f Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Mon, 19 Sep 2022 15:53:17 +0530 Subject: [PATCH 2/2] Add test for serde info --- .../java/org/apache/hudi/hive/HoodieHiveSyncClient.java | 7 +++++++ .../test/java/org/apache/hudi/hive/TestHiveSyncTool.java | 3 +++ 2 files changed, 10 insertions(+) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index f12684251597c..a740c93d65af1 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -353,4 +353,11 @@ public void updateTableComments(String tableName, List fromMetastor } } + Table getTable(String tableName) { + try { + return client.getTable(databaseName, tableName); + } catch (TException e) { + throw new HoodieHiveSyncException(String.format("Database: %s, Table: %s does not exist", databaseName, tableName), e); + } + } } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 0673e08489ef7..8dcfc1675883d 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -159,6 +159,9 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes"); + assertEquals("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", + hiveClient.getTable(HiveTestUtil.TABLE_NAME).getSd().getSerdeInfo().getSerializationLib(), + "SerDe info not updated or does not match"); assertEquals(hiveClient.getMetastoreSchema(HiveTestUtil.TABLE_NAME).size(), hiveClient.getStorageSchema().getColumns().size() + 1, "Hive Schema should match the table schema + partition field");