Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public String rollbackToSavepoint(
@CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final String instantTime,
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
@CliOption(key = {"lazyFailedWritesCleanPolicy"}, help = "True if FailedWriteCleanPolicy is lazy",
unspecifiedDefaultValue = "false") final String lazyFailedWritesCleanPolicy,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory)
throws Exception {
Expand All @@ -110,7 +112,7 @@ public String rollbackToSavepoint(

SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK_TO_SAVEPOINT.toString(), master, sparkMemory,
instantTime, metaClient.getBasePath());
instantTime, metaClient.getBasePath(), lazyFailedWritesCleanPolicy);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
Expand Down
31 changes: 18 additions & 13 deletions hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSavepointException;
Expand Down Expand Up @@ -102,8 +104,8 @@ public static void main(String[] args) throws Exception {
returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], Boolean.parseBoolean(args[6]), args[7]);
break;
case ROLLBACK_TO_SAVEPOINT:
assert (args.length == 5);
returnCode = rollbackToSavepoint(jsc, args[3], args[4]);
assert (args.length == 6);
returnCode = rollbackToSavepoint(jsc, args[3], args[4], Boolean.parseBoolean(args[5]));
break;
case IMPORT:
case UPSERT:
Expand Down Expand Up @@ -285,7 +287,7 @@ protected static void clean(JavaSparkContext jsc, String basePath, String propsF

protected static int deleteMarker(JavaSparkContext jsc, String instantTime, String basePath) {
try {
SparkRDDWriteClient client = createHoodieClient(jsc, basePath);
SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false);
HoodieWriteConfig config = client.getConfig();
HoodieEngineContext context = client.getEngineContext();
HoodieSparkTable table = HoodieSparkTable.create(config, context, true);
Expand Down Expand Up @@ -455,7 +457,7 @@ private static int rollback(JavaSparkContext jsc, String instantTime, String bas

private static int createSavepoint(JavaSparkContext jsc, String commitTime, String user,
String comments, String basePath) throws Exception {
SparkRDDWriteClient client = createHoodieClient(jsc, basePath);
SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false);
try {
client.savepoint(commitTime, user, comments);
LOG.info(String.format("The commit \"%s\" has been savepointed.", commitTime));
Expand All @@ -466,8 +468,8 @@ private static int createSavepoint(JavaSparkContext jsc, String commitTime, Stri
}
}

private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception {
SparkRDDWriteClient client = createHoodieClient(jsc, basePath);
private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath, boolean lazyCleanPolicy) throws Exception {
SparkRDDWriteClient client = createHoodieClient(jsc, basePath, lazyCleanPolicy);
try {
client.restoreToSavepoint(savepointTime);
LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
Expand All @@ -479,7 +481,7 @@ private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTim
}

private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception {
SparkRDDWriteClient client = createHoodieClient(jsc, basePath);
SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false);
try {
client.deleteSavepoint(savepointTime);
LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime));
Expand All @@ -500,7 +502,8 @@ private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, S
* @throws Exception
*/
protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) {
HoodieWriteConfig config = getWriteConfig(basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()));
HoodieWriteConfig config = getWriteConfig(basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()),
false);
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
Expand All @@ -517,18 +520,20 @@ protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePa
}
}

private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, Boolean rollbackUsingMarkers) throws Exception {
HoodieWriteConfig config = getWriteConfig(basePath, rollbackUsingMarkers);
private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) throws Exception {
HoodieWriteConfig config = getWriteConfig(basePath, rollbackUsingMarkers, lazyCleanPolicy);
return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config);
}

private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
return createHoodieClient(jsc, basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()));
private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, boolean lazyCleanPolicy) throws Exception {
return createHoodieClient(jsc, basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()), lazyCleanPolicy);
}

private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers) {
private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) {
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(rollbackUsingMarkers)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
HoodieFailedWritesCleaningPolicy.EAGER).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
}
}