diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index adcf394864634..c51c8ad69741b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -32,6 +32,7 @@ import org.apache.hudi.client.heartbeat.HeartbeatUtils; import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.client.utils.TransactionUtils; +import org.apache.hudi.common.HoodiePendingRollbackInfo; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -67,6 +68,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.rollback.RollbackUtils; import org.apache.hudi.table.action.savepoint.SavepointHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; @@ -80,6 +82,8 @@ import java.text.ParseException; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -584,22 +588,40 @@ public void restoreToSavepoint(String savepointTime) { @Deprecated public boolean rollback(final String commitInstantTime) throws HoodieRollbackException { - return rollback(commitInstantTime, false); + HoodieTable table = createTable(config, hadoopConf); + Option pendingRollbackInfo = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime); + return rollback(commitInstantTime, pendingRollbackInfo, false); } /** * @Deprecated * Rollback the inflight record changes with the given commit time. This * will be removed in future in favor of {@link AbstractHoodieWriteClient#restoreToInstant(String)} - * + * Adding this api for backwards compatability. * @param commitInstantTime Instant time of the commit * @param skipLocking if this is triggered by another parent transaction, locking can be skipped. * @throws HoodieRollbackException if rollback cannot be performed successfully */ @Deprecated public boolean rollback(final String commitInstantTime, boolean skipLocking) throws HoodieRollbackException { + return rollback(commitInstantTime, Option.empty(), skipLocking); + } + + /** + * @Deprecated + * Rollback the inflight record changes with the given commit time. This + * will be removed in future in favor of {@link AbstractHoodieWriteClient#restoreToInstant(String)} + * + * @param commitInstantTime Instant time of the commit + * @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt. + * @param skipLocking if this is triggered by another parent transaction, locking can be skipped. + * @throws HoodieRollbackException if rollback cannot be performed successfully + */ + @Deprecated + public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException { LOG.info("Begin rollback of instant " + commitInstantTime); - final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime(); + boolean pendingRollback = pendingRollbackInfo.isPresent(); + final String rollbackInstantTime = pendingRollback ? pendingRollbackInfo.get().getRollbackInstant().getTimestamp() : HoodieActiveTimeline.createNewInstantTime(); final Timer.Context timerContext = this.metrics.getRollbackCtx(); try { HoodieTable table = createTable(config, hadoopConf); @@ -608,7 +630,7 @@ public boolean rollback(final String commitInstantTime, boolean skipLocking) thr .findFirst()); if (commitInstantOpt.isPresent()) { LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime); - Option rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, + Option rollbackPlanOption = pendingRollback ? Option.of(pendingRollbackInfo.get().getRollbackPlan()) : table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()); if (rollbackPlanOption.isPresent()) { // execute rollback @@ -838,6 +860,29 @@ private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieT return inflightTimelineExcludeClusteringCommit; } + private Option getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) { + Option pendingRollbackInfo = getPendingRollbackInfos(metaClient).get(commitToRollback); + return pendingRollbackInfo != null ? pendingRollbackInfo : Option.empty(); + } + + /** + * Fetch map of pending commits to be rolledback to {@link HoodiePendingRollbackInfo}. + * @param metaClient instance of {@link HoodieTableMetaClient} to use. + * @return map of pending commits to be rolledback instants to Rollback Instnat and Rollback plan Pair. + */ + protected Map> getPendingRollbackInfos(HoodieTableMetaClient metaClient) { + return metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().map( + entry -> { + try { + HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, entry); + return Pair.of(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(entry, rollbackPlan))); + } catch (IOException e) { + throw new HoodieIOException("Fetching rollback plan failed for " + entry, e); + } + } + ).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + } + /** * Rollback all failed writes. */ @@ -851,22 +896,28 @@ public Boolean rollbackFailedWrites() { */ public Boolean rollbackFailedWrites(boolean skipLocking) { HoodieTable table = createTable(config, hadoopConf); - List instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), - Option.empty()); - rollbackFailedWrites(instantsToRollback, skipLocking); + List instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty()); + Map> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient()); + instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); + + HashMap> reverseSortedRollbackInstants = pendingRollbacks.entrySet() + .stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new)); + rollbackFailedWrites(reverseSortedRollbackInstants, skipLocking); return true; } - protected void rollbackFailedWrites(List instantsToRollback, boolean skipLocking) { - for (String instant : instantsToRollback) { - if (HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, + protected void rollbackFailedWrites(Map> instantsToRollback, boolean skipLocking) { + for (Map.Entry> entry : instantsToRollback.entrySet()) { + if (HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) { + // do we need to handle failed rollback of a bootstrap rollbackFailedBootstrap(); - HeartbeatUtils.deleteHeartbeatFile(fs, basePath, instant, config); + HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config); break; } else { - rollback(instant, skipLocking); - HeartbeatUtils.deleteHeartbeatFile(fs, basePath, instant, config); + rollback(entry.getKey(), entry.getValue(), skipLocking); + HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index 1338bfc46dc83..1de1a1363887d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -62,7 +62,7 @@ public class RollbackUtils { * @return Rollback plan corresponding to rollback instant * @throws IOException */ - static HoodieRollbackPlan getRollbackPlan(HoodieTableMetaClient metaClient, HoodieInstant rollbackInstant) + public static HoodieRollbackPlan getRollbackPlan(HoodieTableMetaClient metaClient, HoodieInstant rollbackInstant) throws IOException { // TODO: add upgrade step if required. return TimelineMetadataUtils.deserializeAvroMetadata( diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 6672028a672f2..5f09e0bfbc446 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -21,6 +21,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.utils.TransactionUtils; +import org.apache.hudi.common.HoodiePendingRollbackInfo; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.metrics.Registry; @@ -442,7 +443,10 @@ protected HoodieTable>, JavaRDD, JavaRDD instantsToRollback = getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)); + Map> pendingRollbacks = getPendingRollbackInfos(metaClient); + instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); + this.rollbackFailedWrites(pendingRollbacks, true); new UpgradeDowngrade( metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()) .run(HoodieTableVersion.current(), instantTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 6412113a59768..1076fe6efaec9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; +import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.config.HoodieCompactionConfig; @@ -247,6 +248,84 @@ public void testRollbackCommit() throws Exception { } } + /** + * Test Cases for effects of rollbacking completed/inflight commits. + */ + @Test + public void testFailedRollbackCommit() throws Exception { + // Let's create some commit files and base files + final String p1 = "2016/05/01"; + final String p2 = "2016/05/02"; + final String p3 = "2016/05/06"; + final String commitTime1 = "20160501010101"; + final String commitTime2 = "20160502020601"; + final String commitTime3 = "20160506030611"; + Map partitionAndFileId1 = new HashMap() { + { + put(p1, "id11"); + put(p2, "id12"); + put(p3, "id13"); + } + }; + Map partitionAndFileId2 = new HashMap() { + { + put(p1, "id21"); + put(p2, "id22"); + put(p3, "id23"); + } + }; + Map partitionAndFileId3 = new HashMap() { + { + put(p1, "id31"); + put(p2, "id32"); + put(p3, "id33"); + } + }; + HoodieTestTable testTable = HoodieTestTable.of(metaClient) + .withPartitionMetaFiles(p1, p2, p3) + .addCommit(commitTime1) + .withBaseFilesInPartitions(partitionAndFileId1) + .addCommit(commitTime2) + .withBaseFilesInPartitions(partitionAndFileId2) + .addInflightCommit(commitTime3) + .withBaseFilesInPartitions(partitionAndFileId3); + + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withRollbackUsingMarkers(false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); + + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + + // Rollback commit3 + client.rollback(commitTime3); + assertFalse(testTable.inflightCommitExists(commitTime3)); + assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3)); + assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2)); + + metaClient.reloadActiveTimeline(); + List rollbackInstants = metaClient.getActiveTimeline().getRollbackTimeline().getInstants().collect(Collectors.toList()); + assertEquals(rollbackInstants.size(), 1); + HoodieInstant rollbackInstant = rollbackInstants.get(0); + + // delete rollback completed meta file and retry rollback. + FileCreateUtils.deleteRollbackCommit(basePath, rollbackInstant.getTimestamp()); + + // recreate actual commit files so that we can retry the rollback + testTable.addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3); + + // retry rolling back the commit again. + client.rollback(commitTime3); + + // verify there are no extra rollback instants + metaClient.reloadActiveTimeline(); + rollbackInstants = metaClient.getActiveTimeline().getRollbackTimeline().getInstants().collect(Collectors.toList()); + assertEquals(rollbackInstants.size(), 1); + assertEquals(rollbackInstants.get(0), rollbackInstant); + } + } + /** * Test auto-rollback of commits which are in flight. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodiePendingRollbackInfo.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodiePendingRollbackInfo.java new file mode 100644 index 0000000000000..c53babf350102 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodiePendingRollbackInfo.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common; + +import org.apache.hudi.avro.model.HoodieRollbackPlan; +import org.apache.hudi.common.table.timeline.HoodieInstant; + +/** + * Holds rollback instant and rollback plan for a pending rollback. + */ +public class HoodiePendingRollbackInfo { + + private final HoodieInstant rollbackInstant; + private final HoodieRollbackPlan rollbackPlan; + + public HoodiePendingRollbackInfo(HoodieInstant rollbackInstant, HoodieRollbackPlan rollbackPlan) { + this.rollbackInstant = rollbackInstant; + this.rollbackPlan = rollbackPlan; + } + + public HoodieInstant getRollbackInstant() { + return rollbackInstant; + } + + public HoodieRollbackPlan getRollbackPlan() { + return rollbackPlan; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 3ebaba56d97c1..cfd77d12494a9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -124,6 +124,12 @@ public HoodieTimeline filterPendingReplaceTimeline() { s -> s.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) && !s.isCompleted()), details); } + @Override + public HoodieTimeline filterPendingRollbackTimeline() { + return new HoodieDefaultTimeline(instants.stream().filter( + s -> s.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && !s.isCompleted()), details); + } + @Override public HoodieTimeline filterPendingCompactionTimeline() { return new HoodieDefaultTimeline( diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 3c4d69f3d1a4d..0898498be63bb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -157,6 +157,11 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline filterPendingReplaceTimeline(); + /** + * Filter this timeline to include pending rollbacks. + */ + HoodieTimeline filterPendingRollbackTimeline(); + /** * Create a new Timeline with all the instants after startTs. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 491ad32f90dfb..15215b8fc59b6 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -340,6 +340,10 @@ public static void deleteReplaceCommit(String basePath, String instantTime) thro removeMetaFile(basePath, instantTime, HoodieTimeline.REPLACE_COMMIT_EXTENSION); } + public static void deleteRollbackCommit(String basePath, String instantTime) throws IOException { + removeMetaFile(basePath, instantTime, HoodieTimeline.ROLLBACK_EXTENSION); + } + public static long getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime, IOType ioType) throws IOException { Path parentPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath); if (Files.notExists(parentPath)) {