diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java index 8b6d47b9a9aaf..e5b07fd99f403 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java @@ -93,6 +93,8 @@ public String rollbackToSavepoint( @CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final String instantTime, @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, + @CliOption(key = {"lazyFailedWritesCleanPolicy"}, help = "True if FailedWriteCleanPolicy is lazy", + unspecifiedDefaultValue = "false") final String lazyFailedWritesCleanPolicy, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", help = "Spark executor memory") final String sparkMemory) throws Exception { @@ -110,7 +112,7 @@ public String rollbackToSavepoint( SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK_TO_SAVEPOINT.toString(), master, sparkMemory, - instantTime, metaClient.getBasePath()); + instantTime, metaClient.getBasePath(), lazyFailedWritesCleanPolicy); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 0de1a1adfe0be..4b1bcd5db36e7 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -35,6 +36,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieBootstrapConfig; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieSavepointException; @@ -102,8 +104,8 @@ public static void main(String[] args) throws Exception { returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], Boolean.parseBoolean(args[6]), args[7]); break; case ROLLBACK_TO_SAVEPOINT: - assert (args.length == 5); - returnCode = rollbackToSavepoint(jsc, args[3], args[4]); + assert (args.length == 6); + returnCode = rollbackToSavepoint(jsc, args[3], args[4], Boolean.parseBoolean(args[5])); break; case IMPORT: case UPSERT: @@ -285,7 +287,7 @@ protected static void clean(JavaSparkContext jsc, String basePath, String propsF protected static int deleteMarker(JavaSparkContext jsc, String instantTime, String basePath) { try { - SparkRDDWriteClient client = createHoodieClient(jsc, basePath); + SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false); HoodieWriteConfig config = client.getConfig(); HoodieEngineContext context = client.getEngineContext(); HoodieSparkTable table = HoodieSparkTable.create(config, context, true); @@ -455,7 +457,7 @@ private static int rollback(JavaSparkContext jsc, String instantTime, String bas private static int createSavepoint(JavaSparkContext jsc, String commitTime, String user, String comments, String basePath) throws Exception { - SparkRDDWriteClient client = createHoodieClient(jsc, basePath); + SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false); try { client.savepoint(commitTime, user, comments); LOG.info(String.format("The commit \"%s\" has been savepointed.", commitTime)); @@ -466,8 +468,8 @@ private static int createSavepoint(JavaSparkContext jsc, String commitTime, Stri } } - private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception { - SparkRDDWriteClient client = createHoodieClient(jsc, basePath); + private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath, boolean lazyCleanPolicy) throws Exception { + SparkRDDWriteClient client = createHoodieClient(jsc, basePath, lazyCleanPolicy); try { client.restoreToSavepoint(savepointTime); LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime)); @@ -479,7 +481,7 @@ private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTim } private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception { - SparkRDDWriteClient client = createHoodieClient(jsc, basePath); + SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false); try { client.deleteSavepoint(savepointTime); LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime)); @@ -500,7 +502,8 @@ private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, S * @throws Exception */ protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) { - HoodieWriteConfig config = getWriteConfig(basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue())); + HoodieWriteConfig config = getWriteConfig(basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()), + false); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath()) .setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) @@ -517,18 +520,20 @@ protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePa } } - private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, Boolean rollbackUsingMarkers) throws Exception { - HoodieWriteConfig config = getWriteConfig(basePath, rollbackUsingMarkers); + private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) throws Exception { + HoodieWriteConfig config = getWriteConfig(basePath, rollbackUsingMarkers, lazyCleanPolicy); return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config); } - private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception { - return createHoodieClient(jsc, basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue())); + private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, boolean lazyCleanPolicy) throws Exception { + return createHoodieClient(jsc, basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()), lazyCleanPolicy); } - private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers) { + private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) { return HoodieWriteConfig.newBuilder().withPath(basePath) .withRollbackUsingMarkers(rollbackUsingMarkers) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY : + HoodieFailedWritesCleaningPolicy.EAGER).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); } }