diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 444eae62b2ec4..d98da346e13e4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -599,7 +599,7 @@ public boolean rollback(final String commitInstantTime, boolean skipLocking) thr if (commitInstantOpt.isPresent()) { LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime); Option rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, - commitInstantOpt.get(), false); + commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()); if (rollbackPlanOption.isPresent()) { // execute rollback HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, @@ -1024,7 +1024,7 @@ protected Option inlineCluster(Option> extraMetadata protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) { String commitTime = HoodieActiveTimeline.createNewInstantTime(); - table.scheduleRollback(context, commitTime, inflightInstant, false); + table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers()); table.rollback(context, commitTime, inflightInstant, false, false); table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 6046374ba107d..747470f1dbffc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -442,12 +442,13 @@ public abstract Option scheduleCleaning(HoodieEngineContext c * @param context HoodieEngineContext * @param instantTime Instant Time for scheduling rollback * @param instantToRollback instant to be rolled back + * @param shouldRollbackUsingMarkers uses marker based rollback strategy when set to true. uses list based rollback when false. * @return HoodieRollbackPlan containing info on rollback. */ public abstract Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback, - boolean skipTimelinePublish); + boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers); /** * Rollback the (inflight/committed) record changes with the given commit time. @@ -490,7 +491,7 @@ public abstract HoodieRestoreMetadata restore(HoodieEngineContext context, */ public void rollbackInflightCompaction(HoodieInstant inflightInstant) { String commitTime = HoodieActiveTimeline.createNewInstantTime(); - scheduleRollback(context, commitTime, inflightInstant, false); + scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers()); rollback(context, commitTime, inflightInstant, false, false); getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java index 1116ef9a4dd82..facab71c6237b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java @@ -48,7 +48,7 @@ protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback } table.getMetaClient().reloadActiveTimeline(); String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); - table.scheduleRollback(context, newInstantTime, instantToRollback, false); + table.scheduleRollback(context, newInstantTime, instantToRollback, false, false); table.getMetaClient().reloadActiveTimeline(); CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor( context, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java index db6fbc2620155..661cee4a2e608 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java @@ -52,7 +52,7 @@ protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback } table.getMetaClient().reloadActiveTimeline(); String instantTime = HoodieActiveTimeline.createNewInstantTime(); - table.scheduleRollback(context, instantTime, instantToRollback, false); + table.scheduleRollback(context, instantTime, instantToRollback, false, false); table.getMetaClient().reloadActiveTimeline(); MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor( context, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java index 24edde27642cc..f95ec5d5c9fe1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java @@ -50,6 +50,7 @@ public class BaseRollbackPlanActionExecutor table, String instantTime, HoodieInstant instantToRollback, - boolean skipTimelinePublish) { + boolean skipTimelinePublish, + boolean shouldRollbackUsingMarkers) { super(context, config, table, instantTime); this.instantToRollback = instantToRollback; this.skipTimelinePublish = skipTimelinePublish; + this.shouldRollbackUsingMarkers = shouldRollbackUsingMarkers; } /** @@ -84,7 +87,7 @@ interface RollbackStrategy extends Serializable { * @return */ private BaseRollbackPlanActionExecutor.RollbackStrategy getRollbackStrategy() { - if (config.shouldRollbackUsingMarkers()) { + if (shouldRollbackUsingMarkers) { return new MarkerBasedRollbackStrategy(table, context, config, instantTime); } else { return new ListingBasedRollbackStrategy(table, context, config, instantTime); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 85ad1364de9ca..7e25559bb1772 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -316,8 +316,9 @@ public Option scheduleCleaning(HoodieEngineContext context, S @Override public Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback, - boolean skipTimelinePublish) { - return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute(); + boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) { + return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish, + shouldRollbackUsingMarkers).execute(); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index 5ad87e0831e97..a65e03da761d3 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -120,8 +120,9 @@ context, config, this, compactionInstantTime, new HoodieFlinkMergeOnReadTableCom @Override public Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback, - boolean skipTimelinePublish) { - return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute(); + boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) { + return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish, + shouldRollbackUsingMarkers).execute(); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index a9e582110c206..4107adb8fb488 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -193,8 +193,9 @@ public void rollbackBootstrap(HoodieEngineContext context, @Override public Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback, - boolean skipTimelinePublish) { - return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute(); + boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) { + return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish, + shouldRollbackUsingMarkers).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 0971b87c44675..74d4718a932fc 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -230,8 +230,9 @@ public Option scheduleCleaning(HoodieEngineContext context, S @Override public Option scheduleRollback(HoodieEngineContext context, String instantTime, - HoodieInstant instantToRollback, boolean skipTimelinePublish) { - return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute(); + HoodieInstant instantToRollback, boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) { + return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish, + shouldRollbackUsingMarkers).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index 9e053aaa0da44..75af5d0f685fc 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -156,8 +156,9 @@ public void rollbackBootstrap(HoodieEngineContext context, String instantTime) { @Override public Option scheduleRollback(HoodieEngineContext context, String instantTime, - HoodieInstant instantToRollback, boolean skipTimelinePublish) { - return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute(); + HoodieInstant instantToRollback, boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) { + return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish, + shouldRollbackUsingMarkers).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index caffb476b8409..86d18fe28b006 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -495,7 +495,7 @@ void assertNodupesInPartition(List records) { @ParameterizedTest @MethodSource("populateMetaFieldsParams") public void testUpserts(boolean populateMetaFields) throws Exception { - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(); + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withRollbackUsingMarkers(true); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsert, false); } @@ -506,7 +506,7 @@ public void testUpserts(boolean populateMetaFields) throws Exception { @ParameterizedTest @MethodSource("populateMetaFieldsParams") public void testUpsertsPrepped(boolean populateMetaFields) throws Exception { - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(); + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withRollbackUsingMarkers(true); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsertPreppedRecords, true); } @@ -523,6 +523,7 @@ private void testUpsertsInternal(HoodieWriteConfig config, throws Exception { // Force using older timeline layout HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) + .withRollbackUsingMarkers(true) .withProps(config.getProps()).withTimelineLayoutVersion( VERSION_0).build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index cb468e903e59b..2305d7bdeb4d1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -1307,7 +1307,7 @@ public void testCleanMarkerDataFilesOnRollback() throws Exception { new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty()); metaClient.reloadActiveTimeline(); HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"); - table.scheduleRollback(context, "001", rollbackInstant, false); + table.scheduleRollback(context, "001", rollbackInstant, false, config.shouldRollbackUsingMarkers()); table.rollback(context, "001", rollbackInstant, true, false); final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length; assertEquals(0, numTempFilesAfter, "All temp files are deleted."); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index 3225dcd04ea3f..1bee6ac0ac622 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -86,7 +86,8 @@ public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile() // execute CopyOnWriteRollbackActionExecutor with filelisting mode BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor = - new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false); + new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false, + table.getConfig().shouldRollbackUsingMarkers()); HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get(); CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true, false); @@ -168,7 +169,8 @@ private void performRollbackAndValidate(boolean isUsingMarkers, HoodieWriteConfi } BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor = - new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false); + new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false, + table.getConfig().shouldRollbackUsingMarkers()); HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get(); CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false, false); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java index 38be873e57ad6..5ec4000b266b6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java @@ -91,7 +91,8 @@ public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws //2. rollback HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002"); BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor = - new BaseRollbackPlanActionExecutor(context, cfg, table, "003", rollBackInstant, false); + new BaseRollbackPlanActionExecutor(context, cfg, table, "003", rollBackInstant, false, + cfg.shouldRollbackUsingMarkers()); mergeOnReadRollbackPlanActionExecutor.execute().get(); MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor( context,