diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index a4e2dc4055ae..c18d3a93fe34 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -33,8 +33,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.PartitionPathEncodeUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.AvroRuntimeException; @@ -289,4 +289,34 @@ public String repairDeprecatePartition( } return "Repair succeeded"; } + + @CliCommand(value = "rename partition", + help = "Rename partition. Usage: rename partition --oldPartition --newPartition ") + public String renamePartition( + @CliOption(key = {"oldPartition"}, help = "Partition value to be renamed", mandatory = true, + unspecifiedDefaultValue = "") String oldPartition, + @CliOption(key = {"newPartition"}, help = "New partition value after rename", mandatory = true, + unspecifiedDefaultValue = "") String newPartition, + @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path", + unspecifiedDefaultValue = "") String sparkPropertiesPath, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", + help = "Spark executor memory") final String sparkMemory) throws Exception { + if (StringUtils.isNullOrEmpty(sparkPropertiesPath)) { + sparkPropertiesPath = + Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); + } + + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkMain.SparkCommand.RENAME_PARTITION.toString(), master, sparkMemory, + HoodieCLI.getTableMetaClient().getBasePathV2().toString(), oldPartition, newPartition); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + + if (exitCode != 0) { + return "rename partition failed!"; + } + return "rename partition succeeded"; + } } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index e293f25d0b71..2a49ed2c4b65 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -94,7 +94,8 @@ public class SparkMain { enum SparkCommand { BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE, COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE, - CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE, REPAIR_DEPRECATED_PARTITION + CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE, + REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION } public static void main(String[] args) throws Exception { @@ -282,6 +283,10 @@ public static void main(String[] args) throws Exception { assert (args.length == 4); returnCode = repairDeprecatedPartition(jsc, args[3]); break; + case RENAME_PARTITION: + assert (args.length == 6); + returnCode = renamePartition(jsc, args[3], args[4], args[5]); + break; default: break; } @@ -428,35 +433,77 @@ private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplica public static int repairDeprecatedPartition(JavaSparkContext jsc, String basePath) { SQLContext sqlContext = new SQLContext(jsc); - Dataset recordsToRewrite = sqlContext.read().option("hoodie.datasource.read.extract.partition.values.from.path","false").format("hudi").load(basePath) - .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH + "'") - .drop(HoodieRecord.RECORD_KEY_METADATA_FIELD).drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD) - .drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD).drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD); + Dataset recordsToRewrite = getRecordsToRewrite(basePath, PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH, sqlContext); if (!recordsToRewrite.isEmpty()) { recordsToRewrite.cache(); - HoodieTableMetaClient metaClient = - HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build(); - - Map propsMap = new HashMap<>(); - metaClient.getTableConfig().getProps().forEach((k, v) -> propsMap.put(k.toString(), v.toString())); - propsMap.put(HoodieWriteConfig.SKIP_DEFAULT_PARTITION_VALIDATION.key(), "true"); - propsMap.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), metaClient.getTableConfig().getRecordKeyFieldProp()); - propsMap.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), metaClient.getTableConfig().getPartitionFieldProp()); - propsMap.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), metaClient.getTableConfig().getKeyGeneratorClassName()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build(); + Map propsMap = getPropsForRewrite(metaClient); + rewriteRecordsToNewPartition(basePath, PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH, recordsToRewrite, metaClient, propsMap); + // after re-writing, we can safely delete older data. + deleteOlderPartition(basePath, PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH, recordsToRewrite, propsMap); + } + return 0; + } - recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), - functions.lit(PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH)).write().options(propsMap) - .option("hoodie.datasource.write.operation","insert").format("hudi").mode("Append").save(basePath); + public static int renamePartition(JavaSparkContext jsc, String basePath, String oldPartition, String newPartition) { + SQLContext sqlContext = new SQLContext(jsc); + Dataset recordsToRewrite = getRecordsToRewrite(basePath, oldPartition, sqlContext); + if (!recordsToRewrite.isEmpty()) { + recordsToRewrite.cache(); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build(); + Map propsMap = getPropsForRewrite(metaClient); + rewriteRecordsToNewPartition(basePath, newPartition, recordsToRewrite, metaClient, propsMap); // after re-writing, we can safely delete older data. - propsMap.put("hoodie.datasource.write.partitions.to.delete", PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH); - recordsToRewrite.write().options(propsMap).option("hoodie.datasource.write.operation", WriteOperationType.DELETE_PARTITION.value()).format("hudi") - .mode("Append").save(basePath); + deleteOlderPartition(basePath, oldPartition, recordsToRewrite, propsMap); } return 0; } + private static void deleteOlderPartition(String basePath, String oldPartition, Dataset recordsToRewrite, Map propsMap) { + propsMap.put("hoodie.datasource.write.partitions.to.delete", oldPartition); + recordsToRewrite.write() + .options(propsMap) + .option("hoodie.datasource.write.operation", WriteOperationType.DELETE_PARTITION.value()) + .format("hudi") + .mode("Append") + .save(basePath); + } + + private static void rewriteRecordsToNewPartition(String basePath, String newPartition, Dataset recordsToRewrite, HoodieTableMetaClient metaClient, Map propsMap) { + recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(newPartition)) + .write() + .options(propsMap) + .option("hoodie.datasource.write.operation", "insert") + .format("hudi") + .mode("Append") + .save(basePath); + } + + private static Dataset getRecordsToRewrite(String basePath, String oldPartition, SQLContext sqlContext) { + return sqlContext.read() + .option("hoodie.datasource.read.extract.partition.values.from.path", "false") + .format("hudi") + .load(basePath) + .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + oldPartition + "'") + .drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) + .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + .drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD) + .drop(HoodieRecord.FILENAME_METADATA_FIELD) + .drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD); + } + + private static Map getPropsForRewrite(HoodieTableMetaClient metaClient) { + Map propsMap = new HashMap<>(); + metaClient.getTableConfig().getProps().forEach((k, v) -> propsMap.put(k.toString(), v.toString())); + propsMap.put(HoodieWriteConfig.SKIP_DEFAULT_PARTITION_VALIDATION.key(), "true"); + propsMap.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), metaClient.getTableConfig().getRecordKeyFieldProp()); + propsMap.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), metaClient.getTableConfig().getPartitionFieldProp()); + propsMap.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), metaClient.getTableConfig().getKeyGeneratorClassName()); + return propsMap; + } + private static int doBootstrap(JavaSparkContext jsc, String tableName, String tableType, String basePath, String sourcePath, String recordKeyCols, String partitionFields, String parallelism, String schemaProviderClass, String bootstrapIndexClass, String selectorClass, String keyGenerator, String fullBootstrapInputProvider, diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java index 3f35c9d96aa1..92d3fc52964b 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java @@ -73,6 +73,7 @@ import static org.apache.hudi.common.table.HoodieTableConfig.VERSION; import static org.apache.hudi.common.table.HoodieTableConfig.generateChecksum; import static org.apache.hudi.common.table.HoodieTableConfig.validateChecksum; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -319,4 +320,51 @@ public void testRepairDeprecatedPartition() throws IOException { } } + @Test + public void testRenamePartition() throws IOException { + tablePath = tablePath + "/rename_partition_test/"; + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.COPY_ON_WRITE.name()) + .setTableName(tableName()) + .setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()) + .setPayloadClassName("org.apache.hudi.common.model.HoodieAvroPayload") + .setTimelineLayoutVersion(TimelineLayoutVersion.VERSION_1) + .setPartitionFields("partition_path") + .setRecordKeyFields("_row_key") + .setKeyGeneratorClassProp(SimpleKeyGenerator.class.getCanonicalName()) + .initTable(HoodieCLI.conf, tablePath); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath).withSchema(TRIP_EXAMPLE_SCHEMA).build(); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(context(), config)) { + String newCommitTime = "001"; + int numRecords = 20; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, numRecords); + JavaRDD writeRecords = context().getJavaSparkContext().parallelize(records, 1); + List result = client.upsert(writeRecords, newCommitTime).collect(); + Assertions.assertNoWriteErrors(result); + + SQLContext sqlContext = context().getSqlContext(); + long totalRecs = sqlContext.read().format("hudi").load(tablePath).count(); + assertEquals(totalRecs, 20); + long totalRecsInOldPartition = sqlContext.read().format("hudi").load(tablePath) + .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + DEFAULT_FIRST_PARTITION_PATH + "'").count(); + + // Execute rename partition command + assertEquals(0, SparkMain.renamePartition(jsc(), tablePath, DEFAULT_FIRST_PARTITION_PATH, "2016/03/18")); + + // there should not be any records in old partition + totalRecs = sqlContext.read().format("hudi").load(tablePath) + .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + DEFAULT_FIRST_PARTITION_PATH + "'").count(); + assertEquals(totalRecs, 0); + + // all records from old partition should have been migrated to new partition + totalRecs = sqlContext.read().format("hudi").load(tablePath) + .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + "2016/03/18" + "'").count(); + assertEquals(totalRecs, totalRecsInOldPartition); + } + } }