diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java deleted file mode 100644 index 50fdf36c81701..0000000000000 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java +++ /dev/null @@ -1,396 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.utilities; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hudi.DataSourceUtils; -import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.client.SparkRDDWriteClient; -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.hive.HiveSyncConfig; -import org.apache.hudi.hive.HiveSyncTool; -import org.apache.hudi.keygen.constant.KeyGeneratorOptions; -import org.apache.hudi.table.HoodieSparkTable; - -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; - -import scala.Tuple2; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -/** - * A tool with spark-submit to drop Hudi table partitions. - * - *

- * You can dry run this tool with the following command to look and print for the table partitions and corresponding data files which will be deleted. - * ``` - * spark-submit \ - * --class org.apache.hudi.utilities.HoodieDropPartitionsTool \ - * --packages org.apache.spark:spark-avro_2.11:2.4.4 \ - * --master local[*] - * --driver-memory 1g \ - * --executor-memory 1g \ - * $HUDI_DIR/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar \ - * --base-path basePath \ - * --table-name tableName \ - * --mode dry_run \ - * --partitions partition1,partition2 - * ``` - * - *

- * - * You can delete the table partitions with '--mode delete' - * - * - DELETE ("delete"): This tool will mask/tombstone these partitions and corresponding data files and let cleaner delete these files later. - * - Also you can set --sync-hive-meta to sync current drop partition into hive - *

- * Example command: - * ``` - * spark-submit \ - * --class org.apache.hudi.utilities.HoodieDropPartitionsTool \ - * --packages org.apache.spark:spark-avro_2.11:2.4.4 \ - * --master local[*] - * --driver-memory 1g \ - * --executor-memory 1g \ - * $HUDI_DIR/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar \ - * --base-path basePath \ - * --table-name tableName \ - * --mode delete \ - * --partitions partition1,partition2 - * ``` - * - * Also you can use --help to find more configs to use. - */ -public class HoodieDropPartitionsTool implements Serializable { - - private static final Logger LOG = LogManager.getLogger(HoodieDropPartitionsTool.class); - // Spark context - private final transient JavaSparkContext jsc; - // config - private final Config cfg; - // Properties with source, hoodie client, key generator etc. - private TypedProperties props; - - private final HoodieTableMetaClient metaClient; - - public HoodieDropPartitionsTool(JavaSparkContext jsc, Config cfg) { - this.jsc = jsc; - this.cfg = cfg; - - this.props = cfg.propsFilePath == null - ? UtilHelpers.buildProperties(cfg.configs) - : readConfigFromFileSystem(jsc, cfg); - this.metaClient = HoodieTableMetaClient.builder() - .setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath) - .setLoadActiveTimelineOnLoad(true) - .build(); - } - - /** - * Reads config from the file system. - * - * @param jsc {@link JavaSparkContext} instance. - * @param cfg {@link Config} instance. - * @return the {@link TypedProperties} instance. - */ - private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { - return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) - .getProps(true); - } - - public enum Mode { - // Mask/Tombstone these partitions and corresponding data files and let cleaner delete these files later. - DELETE, - // Dry run by looking for the table partitions and corresponding data files which will be deleted. - DRY_RUN - } - - public static class Config implements Serializable { - @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true) - public String basePath = null; - @Parameter(names = {"--mode", "-m"}, description = "Set job mode: " - + "Set \"delete\" means mask/tombstone these partitions and corresponding data files table partitions and let cleaner delete these files later;" - + "Set \"dry_run\" means only looking for the table partitions will be deleted and corresponding data files.", required = true) - public String runningMode = null; - @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true) - public String tableName = null; - @Parameter(names = {"--partitions", "-p"}, description = "Comma separated list of partitions to delete.", required = true) - public String partitions = null; - @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert/upsert/delete", required = false) - public int parallelism = 1500; - @Parameter(names = {"--instant-time", "-it"}, description = "instant time for delete table partitions operation.", required = false) - public String instantTime = null; - @Parameter(names = {"--sync-hive-meta", "-sync"}, description = "Sync information to HMS.", required = false) - public boolean syncToHive = false; - @Parameter(names = {"--hive-database", "-db"}, description = "Database to sync to.", required = false) - public String hiveDataBase = null; - @Parameter(names = {"--hive-table-name"}, description = "Table to sync to.", required = false) - public String hiveTableName = null; - @Parameter(names = {"--hive-user-name", "-user"}, description = "hive user name to use.", required = false) - public String hiveUserName = "hive"; - @Parameter(names = {"--hive-pass-word", "-pass"}, description = "hive password to use.", required = false) - public String hivePassWord = "hive"; - @Parameter(names = {"--hive-jdbc-url", "-jdbc"}, description = "hive url to use.", required = false) - public String hiveURL = "jdbc:hive2://localhost:10000"; - @Parameter(names = {"--hive-partition-field"}, description = "Comma separated list of field in the hive table to use for determining hive partition columns.", required = false) - public String hivePartitionsField = ""; - @Parameter(names = {"--hive-sync-use-jdbc"}, description = "Use JDBC when hive synchronization.", required = false) - public boolean hiveUseJdbc = true; - @Parameter(names = {"--hive-metastore-uris"}, description = "hive meta store uris to use.", required = false) - public String hiveHMSUris = null; - @Parameter(names = {"--hive-sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.", required = false) - public String hiveSyncMode = "hms"; - @Parameter(names = {"--hive-sync-ignore-exception"}, description = "Ignore hive sync exception.", required = false) - public boolean hiveSyncIgnoreException = false; - @Parameter(names = {"--hive-partition-value-extractor-class"}, description = "Class which implements PartitionValueExtractor to extract the partition values," - + " default 'SlashEncodedDayPartitionValueExtractor'.", required = false) - public String partitionValueExtractorClass = "org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor"; - @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false) - public String sparkMaster = null; - @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = false) - public String sparkMemory = "1g"; - @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " - + "hoodie client for deleting table partitions") - public String propsFilePath = null; - @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " - + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", - splitter = IdentitySplitter.class) - public List configs = new ArrayList<>(); - @Parameter(names = {"--help", "-h"}, help = true) - public Boolean help = false; - - @Override - public String toString() { - return "HoodieDropPartitionsToolConfig {\n" - + " --base-path " + basePath + ", \n" - + " --mode " + runningMode + ", \n" - + " --table-name " + tableName + ", \n" - + " --partitions " + partitions + ", \n" - + " --parallelism " + parallelism + ", \n" - + " --instantTime " + instantTime + ", \n" - + " --sync-hive-meta " + syncToHive + ", \n" - + " --hive-database " + hiveDataBase + ", \n" - + " --hive-table-name " + hiveTableName + ", \n" - + " --hive-user-name " + "Masked" + ", \n" - + " --hive-pass-word " + "Masked" + ", \n" - + " --hive-jdbc-url " + hiveURL + ", \n" - + " --hive-partition-field " + hivePartitionsField + ", \n" - + " --hive-sync-use-jdbc " + hiveUseJdbc + ", \n" - + " --hive-metastore-uris " + hiveHMSUris + ", \n" - + " --hive-sync-ignore-exception " + hiveSyncIgnoreException + ", \n" - + " --hive-partition-value-extractor-class " + partitionValueExtractorClass + ", \n" - + " --spark-master " + sparkMaster + ", \n" - + " --spark-memory " + sparkMemory + ", \n" - + " --props " + propsFilePath + ", \n" - + " --hoodie-conf " + configs - + "\n}"; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Config config = (Config) o; - return basePath.equals(config.basePath) - && Objects.equals(runningMode, config.runningMode) - && Objects.equals(tableName, config.tableName) - && Objects.equals(partitions, config.partitions) - && Objects.equals(instantTime, config.instantTime) - && Objects.equals(syncToHive, config.syncToHive) - && Objects.equals(hiveDataBase, config.hiveDataBase) - && Objects.equals(hiveTableName, config.hiveTableName) - && Objects.equals(hiveUserName, config.hiveUserName) - && Objects.equals(hivePassWord, config.hivePassWord) - && Objects.equals(hiveURL, config.hiveURL) - && Objects.equals(hivePartitionsField, config.hivePartitionsField) - && Objects.equals(hiveUseJdbc, config.hiveUseJdbc) - && Objects.equals(hiveHMSUris, config.hiveHMSUris) - && Objects.equals(partitionValueExtractorClass, config.partitionValueExtractorClass) - && Objects.equals(sparkMaster, config.sparkMaster) - && Objects.equals(sparkMemory, config.sparkMemory) - && Objects.equals(propsFilePath, config.propsFilePath) - && Objects.equals(configs, config.configs) - && Objects.equals(hiveSyncIgnoreException, config.hiveSyncIgnoreException); - } - - @Override - public int hashCode() { - return Objects.hash(basePath, runningMode, tableName, partitions, instantTime, - syncToHive, hiveDataBase, hiveTableName, hiveUserName, hivePassWord, hiveURL, - hivePartitionsField, hiveUseJdbc, hiveHMSUris, partitionValueExtractorClass, - sparkMaster, sparkMemory, propsFilePath, configs, hiveSyncIgnoreException, help); - } - } - - public static void main(String[] args) { - final Config cfg = new Config(); - JCommander cmd = new JCommander(cfg, null, args); - if (cfg.help || args.length == 0) { - cmd.usage(); - System.exit(1); - } - SparkConf sparkConf = UtilHelpers.buildSparkConf("Hoodie-Drop-Table-Partitions", cfg.sparkMaster); - sparkConf.set("spark.executor.memory", cfg.sparkMemory); - JavaSparkContext jsc = new JavaSparkContext(sparkConf); - HoodieDropPartitionsTool tool = new HoodieDropPartitionsTool(jsc, cfg); - try { - tool.run(); - } catch (Throwable throwable) { - LOG.error("Fail to run deleting table partitions for " + cfg.toString(), throwable); - } finally { - jsc.stop(); - } - } - - public void run() { - try { - if (StringUtils.isNullOrEmpty(cfg.instantTime)) { - cfg.instantTime = HoodieActiveTimeline.createNewInstantTime(); - } - LOG.info(cfg.toString()); - - Mode mode = Mode.valueOf(cfg.runningMode.toUpperCase()); - switch (mode) { - case DELETE: - LOG.info(" ****** The Hoodie Drop Partitions Tool is in delete mode ****** "); - doDeleteTablePartitions(); - syncToHiveIfNecessary(); - break; - case DRY_RUN: - LOG.info(" ****** The Hoodie Drop Partitions Tool is in dry-run mode ****** "); - dryRun(); - break; - default: - LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly"); - } - } catch (Exception e) { - throw new HoodieException("Unable to delete table partitions in " + cfg.basePath, e); - } - } - - public void dryRun() { - try (SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.empty(), props)) { - HoodieSparkTable table = HoodieSparkTable.create(client.getConfig(), client.getEngineContext()); - List parts = Arrays.asList(cfg.partitions.split(",")); - Map> partitionToReplaceFileIds = jsc.parallelize(parts, parts.size()).distinct() - .mapToPair(partitionPath -> new Tuple2<>(partitionPath, table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> fg.getFileId()).distinct().collect(Collectors.toList()))) - .collectAsMap(); - printDeleteFilesInfo(partitionToReplaceFileIds); - } - } - - private void syncToHiveIfNecessary() { - if (cfg.syncToHive) { - HiveSyncConfig hiveSyncConfig = buildHiveSyncProps(); - syncHive(hiveSyncConfig); - } - } - - private void doDeleteTablePartitions() { - - // need to do commit in SparkDeletePartitionCommitActionExecutor#execute - this.props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true"); - try (SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.empty(), props)) { - List partitionsToDelete = Arrays.asList(cfg.partitions.split(",")); - client.startCommitWithTime(cfg.instantTime, HoodieTimeline.REPLACE_COMMIT_ACTION); - client.deletePartitions(partitionsToDelete, cfg.instantTime); - } - } - - private HiveSyncConfig buildHiveSyncProps() { - verifyHiveConfigs(); - TypedProperties props = new TypedProperties(); - props.put(DataSourceWriteOptions.HIVE_DATABASE().key(), cfg.hiveDataBase); - props.put(DataSourceWriteOptions.HIVE_TABLE().key(), cfg.hiveTableName); - props.put(DataSourceWriteOptions.HIVE_USER().key(), cfg.hiveUserName); - props.put(DataSourceWriteOptions.HIVE_PASS().key(), cfg.hivePassWord); - props.put(DataSourceWriteOptions.HIVE_URL().key(), cfg.hiveURL); - props.put(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), cfg.hivePartitionsField); - props.put(DataSourceWriteOptions.HIVE_USE_JDBC().key(), cfg.hiveUseJdbc); - props.put(DataSourceWriteOptions.HIVE_SYNC_MODE().key(), cfg.hiveSyncMode); - props.put(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(), cfg.hiveSyncIgnoreException); - props.put(DataSourceWriteOptions.HIVE_PASS().key(), cfg.hivePassWord); - props.put(DataSourceWriteOptions.PARTITIONS_TO_DELETE().key(), cfg.partitions); - props.put(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(), cfg.partitionValueExtractorClass); - props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), cfg.hivePartitionsField); - - return DataSourceUtils.buildHiveSyncConfig(props, cfg.basePath, "PARQUET"); - } - - private void verifyHiveConfigs() { - ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(cfg.hiveDataBase), "Hive database name couldn't be null or empty when enable sync meta, please set --hive-database/-db."); - ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(cfg.hiveTableName), "Hive table name couldn't be null or empty when enable sync meta, please set --hive-table-name/-tn."); - } - - private void syncHive(HiveSyncConfig hiveSyncConfig) { - LOG.info("Syncing target hoodie table with hive table(" - + hiveSyncConfig.tableName - + "). Hive metastore URL :" - + hiveSyncConfig.jdbcUrl - + ", basePath :" + cfg.basePath); - LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString()); - FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); - HiveConf hiveConf = new HiveConf(); - if (!StringUtils.isNullOrEmpty(cfg.hiveHMSUris)) { - hiveConf.set("hive.metastore.uris", cfg.hiveHMSUris); - } - hiveConf.addResource(fs.getConf()); - LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString()); - HiveSyncTool hiveSyncTool = new HiveSyncTool(hiveSyncConfig, hiveConf, fs); - hiveSyncTool.syncHoodieTable(); - } - - /** - * Prints the delete data files info. - * - * @param partitionToReplaceFileIds - */ - private void printDeleteFilesInfo(Map> partitionToReplaceFileIds) { - LOG.info("Data files and partitions to delete : "); - for (Map.Entry> entry : partitionToReplaceFileIds.entrySet()) { - LOG.info(String.format("Partitions : %s, corresponding data file IDs : %s", entry.getKey(), entry.getValue())); - } - } -}