Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,62 +105,45 @@ public BaseRollbackActionExecutor(HoodieEngineContext context,
private HoodieRollbackMetadata runRollback(HoodieTable<T, I, K, O> table, HoodieInstant rollbackInstant, HoodieRollbackPlan rollbackPlan) {
ValidationUtils.checkArgument(rollbackInstant.getState().equals(HoodieInstant.State.REQUESTED)
|| rollbackInstant.getState().equals(HoodieInstant.State.INFLIGHT));
try {
final HoodieInstant inflightInstant;
final HoodieTimer timer = new HoodieTimer();
timer.startTimer();
if (rollbackInstant.isRequested()) {
inflightInstant = table.getActiveTimeline().transitionRollbackRequestedToInflight(rollbackInstant,
TimelineMetadataUtils.serializeRollbackPlan(rollbackPlan));
} else {
inflightInstant = rollbackInstant;
}
final HoodieTimer timer = new HoodieTimer();
timer.startTimer();
final HoodieInstant inflightInstant = rollbackInstant.isRequested()
? table.getActiveTimeline().transitionRollbackRequestedToInflight(rollbackInstant)
: rollbackInstant;

HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackPlan);
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
instantTime,
Option.of(rollbackTimer.endTimer()),
Collections.singletonList(instantToRollback),
stats);
if (!skipTimelinePublish) {
finishRollback(inflightInstant, rollbackMetadata);
}
HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackPlan);
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
instantTime,
Option.of(rollbackTimer.endTimer()),
Collections.singletonList(instantToRollback),
stats);
if (!skipTimelinePublish) {
finishRollback(inflightInstant, rollbackMetadata);
}

// Finally, remove the markers post rollback.
WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp())
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
// Finally, remove the markers post rollback.
WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp())
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());

return rollbackMetadata;
} catch (IOException e) {
throw new HoodieIOException("Failed to rollback commit ", e);
}
return rollbackMetadata;
}

@Override
public HoodieRollbackMetadata execute() {
table.getMetaClient().reloadActiveTimeline();
List<HoodieInstant> rollBackInstants = table.getRollbackTimeline()
.filterInflightsAndRequested().getInstants().collect(Collectors.toList());
if (rollBackInstants.isEmpty()) {
throw new HoodieRollbackException("No Requested Rollback Instants found to execute rollback ");
Option<HoodieInstant> rollbackInstant = table.getRollbackTimeline()
.filterInflightsAndRequested()
.filter(instant -> instant.getTimestamp().equals(instantTime))
.firstInstant();
if (!rollbackInstant.isPresent()) {
throw new HoodieRollbackException("No pending rollback instants found to execute rollback");
}
HoodieInstant rollbackInstant = null;
for (HoodieInstant instant : rollBackInstants) {
if (instantTime.equals(instant.getTimestamp())) {
rollbackInstant = instant;
break;
}
}
if (rollbackInstant != null) {
try {
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(table.getMetaClient(), rollbackInstant);
return runRollback(table, rollBackInstants.get(0), rollbackPlan);
} catch (IOException e) {
throw new HoodieIOException("Failed to fetch rollback plan to rollback commit " + rollbackInstant.getTimestamp(), e);
}
} else {
throw new HoodieIOException("No inflight rollback instants found for commit time " + instantTime);
try {
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(table.getMetaClient(), rollbackInstant.get());
return runRollback(table, rollbackInstant.get(), rollbackPlan);
} catch (IOException e) {
throw new HoodieIOException("Failed to fetch rollback plan for commit " + instantTime, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,13 @@ public HoodieInstant transitionRollbackInflightToComplete(HoodieInstant inflight
* Transition Rollback State from requested to inflight.
*
* @param requestedInstant requested instant
* @param data Optional data to be stored
* @return commit instant
*/
public HoodieInstant transitionRollbackRequestedToInflight(HoodieInstant requestedInstant, Option<byte[]> data) {
public HoodieInstant transitionRollbackRequestedToInflight(HoodieInstant requestedInstant) {
ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION));
ValidationUtils.checkArgument(requestedInstant.isRequested());
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, ROLLBACK_ACTION, requestedInstant.getTimestamp());
transitionState(requestedInstant, inflight, data);
transitionState(requestedInstant, inflight, Option.empty());
return inflight;
}

Expand Down