Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ public boolean rollback(final String commitInstantTime, boolean skipLocking) thr
if (commitInstantOpt.isPresent()) {
LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
Option<HoodieRollbackPlan> rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime,
commitInstantOpt.get(), false);
commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers());
if (rollbackPlanOption.isPresent()) {
// execute rollback
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true,
Expand Down Expand Up @@ -1024,7 +1024,7 @@ protected Option<String> inlineCluster(Option<Map<String, String>> extraMetadata

protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, commitTime, inflightInstant, false);
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
table.rollback(context, commitTime, inflightInstant, false, false);
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,12 +442,13 @@ public abstract Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext c
* @param context HoodieEngineContext
* @param instantTime Instant Time for scheduling rollback
* @param instantToRollback instant to be rolled back
* @param shouldRollbackUsingMarkers uses marker based rollback strategy when set to true. uses list based rollback when false.
* @return HoodieRollbackPlan containing info on rollback.
*/
public abstract Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
String instantTime,
HoodieInstant instantToRollback,
boolean skipTimelinePublish);
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers);

/**
* Rollback the (inflight/committed) record changes with the given commit time.
Expand Down Expand Up @@ -490,7 +491,7 @@ public abstract HoodieRestoreMetadata restore(HoodieEngineContext context,
*/
public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
scheduleRollback(context, commitTime, inflightInstant, false);
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
rollback(context, commitTime, inflightInstant, false, false);
getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback
}
table.getMetaClient().reloadActiveTimeline();
String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, newInstantTime, instantToRollback, false);
table.scheduleRollback(context, newInstantTime, instantToRollback, false, false);
table.getMetaClient().reloadActiveTimeline();
CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback
}
table.getMetaClient().reloadActiveTimeline();
String instantTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, instantTime, instantToRollback, false);
table.scheduleRollback(context, instantTime, instantToRollback, false, false);
table.getMetaClient().reloadActiveTimeline();
MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class BaseRollbackPlanActionExecutor<T extends HoodieRecordPayload, I, K,

protected final HoodieInstant instantToRollback;
private final boolean skipTimelinePublish;
private final boolean shouldRollbackUsingMarkers;

public static final Integer ROLLBACK_PLAN_VERSION_1 = 1;
public static final Integer LATEST_ROLLBACK_PLAN_VERSION = ROLLBACK_PLAN_VERSION_1;
Expand All @@ -59,10 +60,12 @@ public BaseRollbackPlanActionExecutor(HoodieEngineContext context,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant instantToRollback,
boolean skipTimelinePublish) {
boolean skipTimelinePublish,
boolean shouldRollbackUsingMarkers) {
super(context, config, table, instantTime);
this.instantToRollback = instantToRollback;
this.skipTimelinePublish = skipTimelinePublish;
this.shouldRollbackUsingMarkers = shouldRollbackUsingMarkers;
}

/**
Expand All @@ -84,7 +87,7 @@ interface RollbackStrategy extends Serializable {
* @return
*/
private BaseRollbackPlanActionExecutor.RollbackStrategy getRollbackStrategy() {
if (config.shouldRollbackUsingMarkers()) {
if (shouldRollbackUsingMarkers) {
return new MarkerBasedRollbackStrategy(table, context, config, instantTime);
} else {
return new ListingBasedRollbackStrategy(table, context, config, instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,9 @@ public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, S

@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
boolean skipTimelinePublish) {
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
shouldRollbackUsingMarkers).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ context, config, this, compactionInstantTime, new HoodieFlinkMergeOnReadTableCom

@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
boolean skipTimelinePublish) {
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
shouldRollbackUsingMarkers).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ public void rollbackBootstrap(HoodieEngineContext context,

@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
boolean skipTimelinePublish) {
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
shouldRollbackUsingMarkers).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,9 @@ public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, S
@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
String instantTime,
HoodieInstant instantToRollback, boolean skipTimelinePublish) {
return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
HoodieInstant instantToRollback, boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
shouldRollbackUsingMarkers).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ public void rollbackBootstrap(HoodieEngineContext context, String instantTime) {
@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
String instantTime,
HoodieInstant instantToRollback, boolean skipTimelinePublish) {
return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
HoodieInstant instantToRollback, boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
shouldRollbackUsingMarkers).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ void assertNodupesInPartition(List<HoodieRecord> records) {
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testUpserts(boolean populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withRollbackUsingMarkers(true);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsert, false);
}
Expand All @@ -506,7 +506,7 @@ public void testUpserts(boolean populateMetaFields) throws Exception {
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testUpsertsPrepped(boolean populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withRollbackUsingMarkers(true);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsertPreppedRecords, true);
}
Expand All @@ -523,6 +523,7 @@ private void testUpsertsInternal(HoodieWriteConfig config,
throws Exception {
// Force using older timeline layout
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.withRollbackUsingMarkers(true)
.withProps(config.getProps()).withTimelineLayoutVersion(
VERSION_0).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,7 @@ public void testCleanMarkerDataFilesOnRollback() throws Exception {
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty());
metaClient.reloadActiveTimeline();
HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000");
table.scheduleRollback(context, "001", rollbackInstant, false);
table.scheduleRollback(context, "001", rollbackInstant, false, config.shouldRollbackUsingMarkers());
table.rollback(context, "001", rollbackInstant, true, false);
final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length;
assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile()

// execute CopyOnWriteRollbackActionExecutor with filelisting mode
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false);
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false,
table.getConfig().shouldRollbackUsingMarkers());
HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true,
false);
Expand Down Expand Up @@ -168,7 +169,8 @@ private void performRollbackAndValidate(boolean isUsingMarkers, HoodieWriteConfi
}

BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false);
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false,
table.getConfig().shouldRollbackUsingMarkers());
HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false,
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws
//2. rollback
HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, cfg, table, "003", rollBackInstant, false);
new BaseRollbackPlanActionExecutor(context, cfg, table, "003", rollBackInstant, false,
cfg.shouldRollbackUsingMarkers());
mergeOnReadRollbackPlanActionExecutor.execute().get();
MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
context,
Expand Down