From 5d6b8ae17dc1b67430dcf25ef5e47e6db826f3c7 Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Sun, 12 Dec 2021 18:19:28 +0800 Subject: [PATCH 1/8] [HUDI-2990] Delete partitions without metadata sync to hms --- .../common/table/TableSchemaResolver.java | 19 ++++++ ...AlterHoodieTableDropPartitionCommand.scala | 20 ++++-- .../org/apache/hudi/dla/HoodieDLAClient.java | 5 ++ .../org/apache/hudi/hive/HiveSyncTool.java | 64 ++++++++++++------- .../apache/hudi/hive/HoodieHiveClient.java | 33 ++++++++-- .../org/apache/hudi/hive/ddl/DDLExecutor.java | 8 +++ .../apache/hudi/hive/ddl/HMSDDLExecutor.java | 19 ++++++ .../hudi/hive/ddl/HiveQueryDDLExecutor.java | 5 ++ .../apache/hudi/hive/ddl/JDBCExecutor.java | 6 ++ .../apache/hudi/hive/TestHiveSyncTool.java | 50 +++++++++++++++ .../sync/common/AbstractSyncHoodieClient.java | 33 +++++++++- 11 files changed, 228 insertions(+), 34 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 51e3e273806c3..e1dba575bb334 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -414,6 +414,25 @@ public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAd return latestSchema; } + + /** + * Get Last commit's Metadata. + */ + public HoodieCommitMetadata getLatestCommitMetadata() { + try { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + if (timeline.lastInstant().isPresent()) { + HoodieInstant instant = timeline.lastInstant().get(); + byte[] data = timeline.getInstantDetails(instant).get(); + return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + } else { + return null; + } + } catch (Exception e) { + throw new HoodieException("Failed to get commit metadata", e); + } + } + /** * Read the parquet schema from a parquet File. */ diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index a3dfdd6a0d042..10c40ed064464 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -17,18 +17,19 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME - -import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} +import org.apache.hudi.hive.MultiPartKeysValueExtractor +import org.apache.hudi.hive.ddl.HiveSyncMode +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand} import org.apache.spark.sql.hudi.HoodieSqlUtils._ +import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} case class AlterHoodieTableDropPartitionCommand( tableIdentifier: TableIdentifier, @@ -67,6 +68,7 @@ extends RunnableCommand { val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table) val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table) + val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") val partitionsToDelete = normalizedSpecs.map { spec => hoodieCatalogTable.partitionFields.map{ partitionColumn => val encodedPartitionValue = if (enableEncodeUrl) { @@ -82,6 +84,7 @@ extends RunnableCommand { }.mkString("/") }.mkString(",") + val enableHive = isEnableHive(sparkSession) withSparkConf(sparkSession, Map.empty) { Map( "path" -> hoodieCatalogTable.tableLocation, @@ -91,7 +94,16 @@ extends RunnableCommand { PARTITIONS_TO_DELETE.key -> partitionsToDelete, RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""), - PARTITIONPATH_FIELD.key -> hoodieCatalogTable.partitionFields.mkString(",") + PARTITIONPATH_FIELD.key -> partitionFields, + HIVE_SYNC_ENABLED.key -> enableHive.toString, + META_SYNC_ENABLED.key -> enableHive.toString, + HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), + HIVE_USE_JDBC.key -> "false", + HIVE_DATABASE.key -> hoodieCatalogTable.table.identifier.database.getOrElse("default"), + HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table, + HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", + HIVE_PARTITION_FIELDS.key -> partitionFields, + HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName ) } } diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java index 20f94f01ef0b3..abf8147ef41ee 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java @@ -287,6 +287,11 @@ public void updatePartitionsToTable(String tableName, List changedPartit } } + @Override + public void dropPartitionsToTable(String tableName, List partitionsToDelete) { + throw new UnsupportedOperationException("No support for dropPartitionsToTable"); + } + public Map, String> scanTablePartitions(String tableName) { String sql = constructShowPartitionSQL(tableName); Statement stmt = null; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 3bbaee1ed8bcb..fc670cf0face0 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -28,7 +28,6 @@ import org.apache.hudi.hive.util.ConfigUtils; import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils; - import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.sync.common.AbstractSyncTool; @@ -166,20 +165,28 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, // Check if the necessary table exists boolean tableExists = hoodieHiveClient.doesTableExist(tableName); - // Get the parquet schema for this table looking at the latest commit - MessageType schema = hoodieHiveClient.getDataSchema(); - - // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table, - // so we disable the syncAsSparkDataSourceTable here to avoid read such kind table - // by the data source way (which will use the HoodieBootstrapRelation). - // TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical. - if (hoodieHiveClient.isBootstrap() - && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ - && !readAsOptimized) { - cfg.syncAsSparkDataSourceTable = false; + // check if isDeletePartition + boolean isDeletePartition = hoodieHiveClient.isDeletePartition(); + + // check if schemaChanged + boolean schemaChanged = false; + + if (!isDeletePartition) { + // Get the parquet schema for this table looking at the latest commit + MessageType schema = hoodieHiveClient.getDataSchema(); + + // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table, + // so we disable the syncAsSparkDataSourceTable here to avoid read such kind table + // by the data source way (which will use the HoodieBootstrapRelation). + // TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical. + if (hoodieHiveClient.isBootstrap() + && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ + && !readAsOptimized) { + cfg.syncAsSparkDataSourceTable = false; + } + // Sync schema if needed + schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema); } - // Sync schema if needed - boolean schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema); LOG.info("Schema sync complete. Syncing partitions for " + tableName); // Get the last time we successfully synced partitions @@ -192,7 +199,7 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size()); // Sync the partitions if needed - boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince); + boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDeletePartition); boolean meetSyncConditions = schemaChanged || partitionsChanged; if (!cfg.isConditionalSync || meetSyncConditions) { hoodieHiveClient.updateLastCommitTimeSynced(tableName); @@ -331,19 +338,32 @@ private Map getSparkSerdeProperties(boolean readAsOptimized) { * Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the * partition path does not match, it updates the partition path). */ - private boolean syncPartitions(String tableName, List writtenPartitionsSince) { + private boolean syncPartitions(String tableName, List writtenPartitionsSince, boolean isDeletePartition) { boolean partitionsChanged; try { List hivePartitions = hoodieHiveClient.scanTablePartitions(tableName); List partitionEvents = - hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); + hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDeletePartition); + List newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD); - LOG.info("New Partitions " + newPartitions); - hoodieHiveClient.addPartitionsToTable(tableName, newPartitions); + if (!newPartitions.isEmpty()) { + LOG.info("New Partitions " + newPartitions); + hoodieHiveClient.addPartitionsToTable(tableName, newPartitions); + } + List updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE); - LOG.info("Changed Partitions " + updatePartitions); - hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions); - partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty(); + if (!updatePartitions.isEmpty()) { + LOG.info("Changed Partitions " + updatePartitions); + hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions); + } + + List dropPartitions = filterPartitions(partitionEvents, PartitionEventType.DROP); + if (!dropPartitions.isEmpty()) { + LOG.info("Drop Partitions " + dropPartitions); + hoodieHiveClient.dropPartitionsToTable(tableName, dropPartitions); + } + + partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty() || !dropPartitions.isEmpty(); } catch (Exception e) { throw new HoodieHiveSyncException("Failed to sync partitions for table " + tableName, e); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 265ab750d5aee..cfac71923bca5 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -122,6 +122,14 @@ public void updatePartitionsToTable(String tableName, List changedPartit ddlExecutor.updatePartitionsToTable(tableName, changedPartitions); } + /** + * Partition path has changed - drop the following partitions. + */ + @Override + public void dropPartitionsToTable(String tableName, List partitionsToDelete) { + ddlExecutor.dropPartitionsToTable(tableName, partitionsToDelete); + } + /** * Update the table properties to the table. */ @@ -147,6 +155,14 @@ public void updateTableProperties(String tableName, Map tablePro * Generate a list of PartitionEvent based on the changes required. */ List getPartitionEvents(List tablePartitions, List partitionStoragePartitions) { + return getPartitionEvents(tablePartitions, partitionStoragePartitions, false); + } + + /** + * Iterate over the storage partitions and find if there are any new partitions that need to be added or updated. + * Generate a list of PartitionEvent based on the changes required. + */ + List getPartitionEvents(List tablePartitions, List partitionStoragePartitions, boolean isDeletePartition) { Map paths = new HashMap<>(); for (Partition tablePartition : tablePartitions) { List hivePartitionValues = tablePartition.getValues(); @@ -161,12 +177,17 @@ List getPartitionEvents(List tablePartitions, List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); - if (!storagePartitionValues.isEmpty()) { - String storageValue = String.join(", ", storagePartitionValues); - if (!paths.containsKey(storageValue)) { - events.add(PartitionEvent.newPartitionAddEvent(storagePartition)); - } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) { - events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition)); + + if (isDeletePartition) { + events.add(PartitionEvent.newPartitionDropEvent(storagePartition)); + } else { + if (!storagePartitionValues.isEmpty()) { + String storageValue = String.join(", ", storagePartitionValues); + if (!paths.containsKey(storageValue)) { + events.add(PartitionEvent.newPartitionAddEvent(storagePartition)); + } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) { + events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition)); + } } } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java index 0e1e223aab551..97ee7ab4a534d 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java @@ -81,5 +81,13 @@ public void createTable(String tableName, MessageType storageSchema, String inpu */ public void updatePartitionsToTable(String tableName, List changedPartitions); + /** + * Remove partitions for a given table. + * + * @param tableName + * @param partitionsToDelete + */ + public void dropPartitionsToTable(String tableName, List partitionsToDelete); + public void close(); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index 37aa54abd33b8..304bdc8dbfcd5 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -226,6 +226,25 @@ public void updatePartitionsToTable(String tableName, List changedPartit } } + @Override + public void dropPartitionsToTable(String tableName, List dropPartitions) { + if (dropPartitions.isEmpty()) { + LOG.info("No partitions to drop for " + tableName); + return; + } + + LOG.info("Drop partitions " + dropPartitions.size() + " on " + tableName); + try { + for (String dropPartition : dropPartitions) { + client.dropPartition(syncConfig.databaseName, tableName, dropPartition, false); + LOG.info("Drop partition " + dropPartition + " on " + tableName); + } + } catch (TException e) { + LOG.error(syncConfig.databaseName + "." + tableName + " drop partition failed", e); + throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " drop partition failed", e); + } + } + @Override public void close() { if (client != null) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index e2635eef0b8f5..99015cbcf0203 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -126,6 +126,11 @@ public Map getTableSchema(String tableName) { } } + @Override + public void dropPartitionsToTable(String tableName, List partitionsToDelete) { + throw new UnsupportedOperationException("No support for dropPartitionsToTable"); + } + @Override public void close() { if (metaStoreClient != null) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java index 1603191c66947..de91d2362f130 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java @@ -32,6 +32,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -141,6 +142,11 @@ public Map getTableSchema(String tableName) { } } + @Override + public void dropPartitionsToTable(String tableName, List partitionsToDelete) { + throw new UnsupportedOperationException("No support for dropPartitionsToTable"); + } + @Override public void close() { try { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index d36727a571deb..ef986411d03e3 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -737,6 +737,56 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception { assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size()); } + @ParameterizedTest + @MethodSource("syncMode") + public void testDropPartitionKeySync(String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; + String instantTime = "100"; + HiveTestUtil.createCOWTable(instantTime, 1, true); + HoodieHiveClient hiveClient = + new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName), + "Table " + hiveSyncConfig.tableName + " should not exist initially"); + // Lets do the sync + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + tool.syncHoodieTable(); + // we need renew the hiveclient after tool.syncHoodieTable(), because it will close hive + // session, then lead to connection retry, we can see there is a exception at log. + hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), + "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes"); + assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(), + hiveClient.getDataSchema().getColumns().size() + 1, + "Hive Schema should match the table schema + partition field"); + assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "Table partitions should match the number of partitions we wrote"); + assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + "The last commit that was synced should be updated in the TBLPROPERTIES"); + + // Adding of new partitions + List newPartition = Arrays.asList("2050/01/01"); + hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, Arrays.asList()); + assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "No new partition should be added"); + hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, newPartition); + assertEquals(2, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "New partition should be added"); + + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + tool.syncHoodieTable(); + + // Drop 1 partition. + ddlExecutor.runSQL("ALTER TABLE `" + hiveSyncConfig.tableName + + "` DROP PARTITION (`datestr`='2050-01-01')"); + + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + List hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName); + assertEquals(1, hivePartitions.size(), + "Table should have 1 partition because of the drop 1 partition"); + } + @ParameterizedTest @MethodSource("syncMode") public void testNonPartitionedSync(String syncMode) throws Exception { diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index ce4720ac00907..a5556b617fddf 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -20,16 +20,18 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; @@ -98,6 +100,8 @@ public abstract void createTable(String tableName, MessageType storageSchema, public abstract void updatePartitionsToTable(String tableName, List changedPartitions); + public abstract void dropPartitionsToTable(String tableName, List partitionsToDelete); + public void updateTableProperties(String tableName, Map tableProperties) {} public abstract Map getTableSchema(String tableName); @@ -155,6 +159,27 @@ public MessageType getDataSchema() { } } + public boolean isDeletePartition() { + try { + HoodieCommitMetadata hoodieCommitMetadata; + if (withOperationField) { + hoodieCommitMetadata = new TableSchemaResolver(metaClient, true).getLatestCommitMetadata(); + } else { + hoodieCommitMetadata = new TableSchemaResolver(metaClient).getLatestCommitMetadata(); + } + if (hoodieCommitMetadata != null) { + if (hoodieCommitMetadata.getOperationType().equals(WriteOperationType.DELETE_PARTITION)) { + return true; + } + } else { + return false; + } + } catch (Exception e) { + throw new HoodieSyncException("Failed to read data schema", e); + } + return false; + } + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") public List getPartitionsWrittenToSince(Option lastCommitTimeSynced) { if (!lastCommitTimeSynced.isPresent()) { @@ -224,7 +249,7 @@ private MessageType readSchemaFromLogFile(Option lastCompactionCo public static class PartitionEvent { public enum PartitionEventType { - ADD, UPDATE + ADD, UPDATE, DROP } public PartitionEventType eventType; @@ -242,5 +267,9 @@ public static PartitionEvent newPartitionAddEvent(String storagePartition) { public static PartitionEvent newPartitionUpdateEvent(String storagePartition) { return new PartitionEvent(PartitionEventType.UPDATE, storagePartition); } + + public static PartitionEvent newPartitionDropEvent(String storagePartition) { + return new PartitionEvent(PartitionEventType.DROP, storagePartition); + } } } From 550ba7889e0d4c553b5347f26c60e97a27844468 Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Mon, 13 Dec 2021 09:19:13 +0800 Subject: [PATCH 2/8] [HUDI-2990] Delete partitions without metadata sync to hms --- .../hudi/hive/ddl/HiveQueryDDLExecutor.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index 99015cbcf0203..10d3c4727bdbb 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -127,8 +127,22 @@ public Map getTableSchema(String tableName) { } @Override - public void dropPartitionsToTable(String tableName, List partitionsToDelete) { - throw new UnsupportedOperationException("No support for dropPartitionsToTable"); + public void dropPartitionsToTable(String tableName, List dropPartitions) { + if (dropPartitions.isEmpty()) { + LOG.info("No partitions to drop for " + tableName); + return; + } + + LOG.info("Drop partitions " + dropPartitions.size() + " on " + tableName); + try { + for (String dropPartition : dropPartitions) { + metaStoreClient.dropPartition(config.databaseName, tableName, dropPartition, false); + LOG.info("Drop partition " + dropPartition + " on " + tableName); + } + } catch (Exception e) { + LOG.error(config.databaseName + "." + tableName + " drop partition failed", e); + throw new HoodieHiveSyncException(config.databaseName + "." + tableName + " drop partition failed", e); + } } @Override From 301d9ab65f3983ecf77b192d4af9401b8d60b059 Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Mon, 13 Dec 2021 10:01:47 +0800 Subject: [PATCH 3/8] [HUDI-2990] Delete partitions without metadata sync to hms --- .../hudi/sync/common/AbstractSyncHoodieClient.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index a5556b617fddf..8c7c0538b2662 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -167,12 +167,9 @@ public boolean isDeletePartition() { } else { hoodieCommitMetadata = new TableSchemaResolver(metaClient).getLatestCommitMetadata(); } - if (hoodieCommitMetadata != null) { - if (hoodieCommitMetadata.getOperationType().equals(WriteOperationType.DELETE_PARTITION)) { - return true; - } - } else { - return false; + + if (hoodieCommitMetadata != null && hoodieCommitMetadata.getOperationType().equals(WriteOperationType.DELETE_PARTITION)) { + return true; } } catch (Exception e) { throw new HoodieSyncException("Failed to read data schema", e); From c556f448e5db4e40fbd5b0a3ab81f3cfa8c30914 Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Mon, 13 Dec 2021 14:44:14 +0800 Subject: [PATCH 4/8] [HUDI-2990] Sync to HMS when deleting partitions --- .../org/apache/hudi/common/table/TableSchemaResolver.java | 6 +++--- .../main/java/org/apache/hudi/dla/HoodieDLAClient.java | 2 +- .../src/main/java/org/apache/hudi/hive/HiveSyncTool.java | 4 ++-- .../main/java/org/apache/hudi/hive/HoodieHiveClient.java | 8 ++++---- .../java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java | 8 ++++---- .../org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java | 8 ++++---- .../main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java | 6 ++++-- .../apache/hudi/sync/common/AbstractSyncHoodieClient.java | 5 +++-- 8 files changed, 25 insertions(+), 22 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index e1dba575bb334..d1a4d96637543 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -418,15 +418,15 @@ public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAd /** * Get Last commit's Metadata. */ - public HoodieCommitMetadata getLatestCommitMetadata() { + public Option getLatestCommitMetadata() { try { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); if (timeline.lastInstant().isPresent()) { HoodieInstant instant = timeline.lastInstant().get(); byte[] data = timeline.getInstantDetails(instant).get(); - return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class)); } else { - return null; + return Option.empty(); } } catch (Exception e) { throw new HoodieException("Failed to get commit metadata", e); diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java index abf8147ef41ee..c3b704bb642f7 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java @@ -289,7 +289,7 @@ public void updatePartitionsToTable(String tableName, List changedPartit @Override public void dropPartitionsToTable(String tableName, List partitionsToDelete) { - throw new UnsupportedOperationException("No support for dropPartitionsToTable"); + throw new UnsupportedOperationException("Not support dropPartitionsToTables yet."); } public Map, String> scanTablePartitions(String tableName) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index fc670cf0face0..85729a02a5594 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -338,12 +338,12 @@ private Map getSparkSerdeProperties(boolean readAsOptimized) { * Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the * partition path does not match, it updates the partition path). */ - private boolean syncPartitions(String tableName, List writtenPartitionsSince, boolean isDeletePartition) { + private boolean syncPartitions(String tableName, List writtenPartitionsSince, boolean isDropPartition) { boolean partitionsChanged; try { List hivePartitions = hoodieHiveClient.scanTablePartitions(tableName); List partitionEvents = - hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDeletePartition); + hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDropPartition); List newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD); if (!newPartitions.isEmpty()) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index cfac71923bca5..287de571dfd30 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -126,8 +126,8 @@ public void updatePartitionsToTable(String tableName, List changedPartit * Partition path has changed - drop the following partitions. */ @Override - public void dropPartitionsToTable(String tableName, List partitionsToDelete) { - ddlExecutor.dropPartitionsToTable(tableName, partitionsToDelete); + public void dropPartitionsToTable(String tableName, List partitionsToDrop) { + ddlExecutor.dropPartitionsToTable(tableName, partitionsToDrop); } /** @@ -162,7 +162,7 @@ List getPartitionEvents(List tablePartitions, List getPartitionEvents(List tablePartitions, List partitionStoragePartitions, boolean isDeletePartition) { + List getPartitionEvents(List tablePartitions, List partitionStoragePartitions, boolean isDropPartition) { Map paths = new HashMap<>(); for (Partition tablePartition : tablePartitions) { List hivePartitionValues = tablePartition.getValues(); @@ -178,7 +178,7 @@ List getPartitionEvents(List tablePartitions, List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); - if (isDeletePartition) { + if (isDropPartition) { events.add(PartitionEvent.newPartitionDropEvent(storagePartition)); } else { if (!storagePartitionValues.isEmpty()) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index 304bdc8dbfcd5..d3efebe856c7b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -227,15 +227,15 @@ public void updatePartitionsToTable(String tableName, List changedPartit } @Override - public void dropPartitionsToTable(String tableName, List dropPartitions) { - if (dropPartitions.isEmpty()) { + public void dropPartitionsToTable(String tableName, List partitionsToDrop) { + if (partitionsToDrop.isEmpty()) { LOG.info("No partitions to drop for " + tableName); return; } - LOG.info("Drop partitions " + dropPartitions.size() + " on " + tableName); + LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName); try { - for (String dropPartition : dropPartitions) { + for (String dropPartition : partitionsToDrop) { client.dropPartition(syncConfig.databaseName, tableName, dropPartition, false); LOG.info("Drop partition " + dropPartition + " on " + tableName); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index 10d3c4727bdbb..7161194e8a18d 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -127,15 +127,15 @@ public Map getTableSchema(String tableName) { } @Override - public void dropPartitionsToTable(String tableName, List dropPartitions) { - if (dropPartitions.isEmpty()) { + public void dropPartitionsToTable(String tableName, List partitionsToDrop) { + if (partitionsToDrop.isEmpty()) { LOG.info("No partitions to drop for " + tableName); return; } - LOG.info("Drop partitions " + dropPartitions.size() + " on " + tableName); + LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName); try { - for (String dropPartition : dropPartitions) { + for (String dropPartition : partitionsToDrop) { metaStoreClient.dropPartition(config.databaseName, tableName, dropPartition, false); LOG.info("Drop partition " + dropPartition + " on " + tableName); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java index de91d2362f130..493d4eea9573b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java @@ -143,8 +143,10 @@ public Map getTableSchema(String tableName) { } @Override - public void dropPartitionsToTable(String tableName, List partitionsToDelete) { - throw new UnsupportedOperationException("No support for dropPartitionsToTable"); + public void dropPartitionsToTable(String tableName, List partitionsToDrop) { + partitionsToDrop.stream() + .map(partition -> String.format("ALTER TABLE `%s` DROP PARTITION (%s)", tableName, partition)) + .forEach(this::runSQL); } @Override diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index 8c7c0538b2662..a27a54bb7e3cb 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -161,14 +161,15 @@ public MessageType getDataSchema() { public boolean isDeletePartition() { try { - HoodieCommitMetadata hoodieCommitMetadata; + Option hoodieCommitMetadata; if (withOperationField) { hoodieCommitMetadata = new TableSchemaResolver(metaClient, true).getLatestCommitMetadata(); } else { hoodieCommitMetadata = new TableSchemaResolver(metaClient).getLatestCommitMetadata(); } - if (hoodieCommitMetadata != null && hoodieCommitMetadata.getOperationType().equals(WriteOperationType.DELETE_PARTITION)) { + if (hoodieCommitMetadata.isPresent() + && hoodieCommitMetadata.get().getOperationType().equals(WriteOperationType.DELETE_PARTITION)) { return true; } } catch (Exception e) { From 6184de8bc6d18499d0ff49a0ef8f92f8ba20ba6e Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Mon, 13 Dec 2021 15:55:11 +0800 Subject: [PATCH 5/8] [HUDI-2990] Sync to HMS when deleting partitions --- .../main/java/org/apache/hudi/dla/HoodieDLAClient.java | 2 +- .../src/main/java/org/apache/hudi/hive/HiveSyncTool.java | 8 ++++---- .../apache/hudi/sync/common/AbstractSyncHoodieClient.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java index c3b704bb642f7..3ee48fe956322 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java @@ -289,7 +289,7 @@ public void updatePartitionsToTable(String tableName, List changedPartit @Override public void dropPartitionsToTable(String tableName, List partitionsToDelete) { - throw new UnsupportedOperationException("Not support dropPartitionsToTables yet."); + throw new UnsupportedOperationException("Not support dropPartitionsToTable yet."); } public Map, String> scanTablePartitions(String tableName) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 85729a02a5594..f07ab88c071e2 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -165,13 +165,13 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, // Check if the necessary table exists boolean tableExists = hoodieHiveClient.doesTableExist(tableName); - // check if isDeletePartition - boolean isDeletePartition = hoodieHiveClient.isDeletePartition(); + // check if isDropPartition + boolean isDropPartition = hoodieHiveClient.isDropPartition(); // check if schemaChanged boolean schemaChanged = false; - if (!isDeletePartition) { + if (!isDropPartition) { // Get the parquet schema for this table looking at the latest commit MessageType schema = hoodieHiveClient.getDataSchema(); @@ -199,7 +199,7 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size()); // Sync the partitions if needed - boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDeletePartition); + boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition); boolean meetSyncConditions = schemaChanged || partitionsChanged; if (!cfg.isConditionalSync || meetSyncConditions) { hoodieHiveClient.updateLastCommitTimeSynced(tableName); diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index a27a54bb7e3cb..ca6300c349e96 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -159,7 +159,7 @@ public MessageType getDataSchema() { } } - public boolean isDeletePartition() { + public boolean isDropPartition() { try { Option hoodieCommitMetadata; if (withOperationField) { From 9326eac2a62cb47ee3458e9935bcf2948e53eb60 Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Mon, 13 Dec 2021 15:59:47 +0800 Subject: [PATCH 6/8] [HUDI-2990] Sync to HMS when deleting partitions --- .../src/main/java/org/apache/hudi/dla/HoodieDLAClient.java | 2 +- .../src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java | 6 +++--- .../apache/hudi/sync/common/AbstractSyncHoodieClient.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java index 3ee48fe956322..77d7362fa8166 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java @@ -288,7 +288,7 @@ public void updatePartitionsToTable(String tableName, List changedPartit } @Override - public void dropPartitionsToTable(String tableName, List partitionsToDelete) { + public void dropPartitionsToTable(String tableName, List partitionsToDrop) { throw new UnsupportedOperationException("Not support dropPartitionsToTable yet."); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java index 97ee7ab4a534d..dc37d92b24983 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java @@ -82,12 +82,12 @@ public void createTable(String tableName, MessageType storageSchema, String inpu public void updatePartitionsToTable(String tableName, List changedPartitions); /** - * Remove partitions for a given table. + * Drop partitions for a given table. * * @param tableName - * @param partitionsToDelete + * @param partitionsToDrop */ - public void dropPartitionsToTable(String tableName, List partitionsToDelete); + public void dropPartitionsToTable(String tableName, List partitionsToDrop); public void close(); } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index ca6300c349e96..0a277bef095b7 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -100,7 +100,7 @@ public abstract void createTable(String tableName, MessageType storageSchema, public abstract void updatePartitionsToTable(String tableName, List changedPartitions); - public abstract void dropPartitionsToTable(String tableName, List partitionsToDelete); + public abstract void dropPartitionsToTable(String tableName, List partitionsToDrop); public void updateTableProperties(String tableName, Map tableProperties) {} From 1abf0cb7b2cedf5df1a2a32f02d15a24c072b5fa Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Mon, 13 Dec 2021 16:03:26 +0800 Subject: [PATCH 7/8] [HUDI-2990] Sync to HMS when deleting partitions --- .../hudi/command/AlterHoodieTableDropPartitionCommand.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index 10c40ed064464..1c295fb5bd8c6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -69,7 +69,7 @@ extends RunnableCommand { val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table) val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table) val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") - val partitionsToDelete = normalizedSpecs.map { spec => + val partitionsToDrop = normalizedSpecs.map { spec => hoodieCatalogTable.partitionFields.map{ partitionColumn => val encodedPartitionValue = if (enableEncodeUrl) { PartitionPathEncodeUtils.escapePathName(spec(partitionColumn)) @@ -91,7 +91,7 @@ extends RunnableCommand { TBL_NAME.key -> hoodieCatalogTable.tableName, TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName, OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL, - PARTITIONS_TO_DELETE.key -> partitionsToDelete, + PARTITIONS_TO_DELETE.key -> partitionsToDrop, RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""), PARTITIONPATH_FIELD.key -> partitionFields, From a1610f3871ad916ad32a2474170c4401a4cd71ec Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Mon, 13 Dec 2021 17:55:47 +0800 Subject: [PATCH 8/8] trigger ci