diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index c51c8ad69741b..d80a15c391c11 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -82,7 +82,6 @@ import java.text.ParseException; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -620,8 +619,7 @@ public boolean rollback(final String commitInstantTime, boolean skipLocking) thr @Deprecated public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException { LOG.info("Begin rollback of instant " + commitInstantTime); - boolean pendingRollback = pendingRollbackInfo.isPresent(); - final String rollbackInstantTime = pendingRollback ? pendingRollbackInfo.get().getRollbackInstant().getTimestamp() : HoodieActiveTimeline.createNewInstantTime(); + final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime()); final Timer.Context timerContext = this.metrics.getRollbackCtx(); try { HoodieTable table = createTable(config, hadoopConf); @@ -630,8 +628,8 @@ public boolean rollback(final String commitInstantTime, Option rollbackPlanOption = pendingRollback ? Option.of(pendingRollbackInfo.get().getRollbackPlan()) : table.scheduleRollback(context, rollbackInstantTime, - commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()); + Option rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan())).orElse(table.scheduleRollback(context, rollbackInstantTime, + commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers())); if (rollbackPlanOption.isPresent()) { // execute rollback HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, @@ -861,8 +859,7 @@ private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieT } private Option getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) { - Option pendingRollbackInfo = getPendingRollbackInfos(metaClient).get(commitToRollback); - return pendingRollbackInfo != null ? pendingRollbackInfo : Option.empty(); + return getPendingRollbackInfos(metaClient).getOrDefault(commitToRollback, Option.empty()); } /** @@ -899,16 +896,16 @@ public Boolean rollbackFailedWrites(boolean skipLocking) { List instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty()); Map> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient()); instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); - - HashMap> reverseSortedRollbackInstants = pendingRollbacks.entrySet() - .stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new)); - rollbackFailedWrites(reverseSortedRollbackInstants, skipLocking); + rollbackFailedWrites(pendingRollbacks, skipLocking); return true; } protected void rollbackFailedWrites(Map> instantsToRollback, boolean skipLocking) { - for (Map.Entry> entry : instantsToRollback.entrySet()) { + // sort in reverse order of commit times + LinkedHashMap> reverseSortedRollbackInstants = instantsToRollback.entrySet() + .stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new)); + for (Map.Entry> entry : reverseSortedRollbackInstants.entrySet()) { if (HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) { // do we need to handle failed rollback of a bootstrap