Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;

Expand All @@ -80,6 +82,8 @@
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -584,22 +588,40 @@ public void restoreToSavepoint(String savepointTime) {

@Deprecated
public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
return rollback(commitInstantTime, false);
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
Option<HoodiePendingRollbackInfo> pendingRollbackInfo = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
return rollback(commitInstantTime, pendingRollbackInfo, false);
}

/**
* @Deprecated
* Rollback the inflight record changes with the given commit time. This
* will be removed in future in favor of {@link AbstractHoodieWriteClient#restoreToInstant(String)}
*
* Adding this api for backwards compatability.
* @param commitInstantTime Instant time of the commit
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
* @throws HoodieRollbackException if rollback cannot be performed successfully
*/
@Deprecated
public boolean rollback(final String commitInstantTime, boolean skipLocking) throws HoodieRollbackException {
return rollback(commitInstantTime, Option.empty(), skipLocking);
}

/**
* @Deprecated
* Rollback the inflight record changes with the given commit time. This
* will be removed in future in favor of {@link AbstractHoodieWriteClient#restoreToInstant(String)}
*
* @param commitInstantTime Instant time of the commit
* @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
* @throws HoodieRollbackException if rollback cannot be performed successfully
*/
@Deprecated
public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException {
LOG.info("Begin rollback of instant " + commitInstantTime);
final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime();
boolean pendingRollback = pendingRollbackInfo.isPresent();
final String rollbackInstantTime = pendingRollback ? pendingRollbackInfo.get().getRollbackInstant().getTimestamp() : HoodieActiveTimeline.createNewInstantTime();
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
Expand All @@ -608,7 +630,7 @@ public boolean rollback(final String commitInstantTime, boolean skipLocking) thr
.findFirst());
if (commitInstantOpt.isPresent()) {
LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
Option<HoodieRollbackPlan> rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime,
Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollback ? Option.of(pendingRollbackInfo.get().getRollbackPlan()) : table.scheduleRollback(context, rollbackInstantTime,
commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers());
if (rollbackPlanOption.isPresent()) {
// execute rollback
Expand Down Expand Up @@ -838,6 +860,29 @@ private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieT
return inflightTimelineExcludeClusteringCommit;
}

private Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) {
Option<HoodiePendingRollbackInfo> pendingRollbackInfo = getPendingRollbackInfos(metaClient).get(commitToRollback);
return pendingRollbackInfo != null ? pendingRollbackInfo : Option.empty();
}

/**
* Fetch map of pending commits to be rolledback to {@link HoodiePendingRollbackInfo}.
* @param metaClient instance of {@link HoodieTableMetaClient} to use.
* @return map of pending commits to be rolledback instants to Rollback Instnat and Rollback plan Pair.
*/
protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
return metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().map(
entry -> {
try {
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, entry);
return Pair.of(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(entry, rollbackPlan)));
} catch (IOException e) {
throw new HoodieIOException("Fetching rollback plan failed for " + entry, e);
}
}
).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

/**
* Rollback all failed writes.
*/
Expand All @@ -851,22 +896,28 @@ public Boolean rollbackFailedWrites() {
*/
public Boolean rollbackFailedWrites(boolean skipLocking) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(),
Option.empty());
rollbackFailedWrites(instantsToRollback, skipLocking);
List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty());
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient());
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));

HashMap<String, Option<HoodiePendingRollbackInfo>> reverseSortedRollbackInstants = pendingRollbacks.entrySet()
.stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
rollbackFailedWrites(reverseSortedRollbackInstants, skipLocking);
return true;
}

protected void rollbackFailedWrites(List<String> instantsToRollback, boolean skipLocking) {
for (String instant : instantsToRollback) {
if (HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS,
protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
for (Map.Entry<String, Option<HoodiePendingRollbackInfo>> entry : instantsToRollback.entrySet()) {
if (HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
// do we need to handle failed rollback of a bootstrap
rollbackFailedBootstrap();
HeartbeatUtils.deleteHeartbeatFile(fs, basePath, instant, config);
HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
break;
} else {
rollback(instant, skipLocking);
HeartbeatUtils.deleteHeartbeatFile(fs, basePath, instant, config);
rollback(entry.getKey(), entry.getValue(), skipLocking);
HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class RollbackUtils {
* @return Rollback plan corresponding to rollback instant
* @throws IOException
*/
static HoodieRollbackPlan getRollbackPlan(HoodieTableMetaClient metaClient, HoodieInstant rollbackInstant)
public static HoodieRollbackPlan getRollbackPlan(HoodieTableMetaClient metaClient, HoodieInstant rollbackInstant)
throws IOException {
// TODO: add upgrade step if required.
return TimelineMetadataUtils.deserializeAvroMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.metrics.Registry;
Expand Down Expand Up @@ -442,7 +443,10 @@ protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<W
this.txnManager.beginTransaction();
try {
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
this.rollbackFailedWrites(getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)), true);
List<String> instantsToRollback = getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime));
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient);
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
this.rollbackFailedWrites(pendingRollbacks, true);
new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.current(), instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.config.HoodieCompactionConfig;
Expand Down Expand Up @@ -247,6 +248,84 @@ public void testRollbackCommit() throws Exception {
}
}

/**
* Test Cases for effects of rollbacking completed/inflight commits.
*/
@Test
public void testFailedRollbackCommit() throws Exception {
// Let's create some commit files and base files
final String p1 = "2016/05/01";
final String p2 = "2016/05/02";
final String p3 = "2016/05/06";
final String commitTime1 = "20160501010101";
final String commitTime2 = "20160502020601";
final String commitTime3 = "20160506030611";
Map<String, String> partitionAndFileId1 = new HashMap<String, String>() {
{
put(p1, "id11");
put(p2, "id12");
put(p3, "id13");
}
};
Map<String, String> partitionAndFileId2 = new HashMap<String, String>() {
{
put(p1, "id21");
put(p2, "id22");
put(p3, "id23");
}
};
Map<String, String> partitionAndFileId3 = new HashMap<String, String>() {
{
put(p1, "id31");
put(p2, "id32");
put(p3, "id33");
}
};
HoodieTestTable testTable = HoodieTestTable.of(metaClient)
.withPartitionMetaFiles(p1, p2, p3)
.addCommit(commitTime1)
.withBaseFilesInPartitions(partitionAndFileId1)
.addCommit(commitTime2)
.withBaseFilesInPartitions(partitionAndFileId2)
.addInflightCommit(commitTime3)
.withBaseFilesInPartitions(partitionAndFileId3);

HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();

try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {

// Rollback commit3
client.rollback(commitTime3);
assertFalse(testTable.inflightCommitExists(commitTime3));
assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3));
assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2));

metaClient.reloadActiveTimeline();
List<HoodieInstant> rollbackInstants = metaClient.getActiveTimeline().getRollbackTimeline().getInstants().collect(Collectors.toList());
assertEquals(rollbackInstants.size(), 1);
HoodieInstant rollbackInstant = rollbackInstants.get(0);

// delete rollback completed meta file and retry rollback.
FileCreateUtils.deleteRollbackCommit(basePath, rollbackInstant.getTimestamp());

// recreate actual commit files so that we can retry the rollback
testTable.addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3);

// retry rolling back the commit again.
client.rollback(commitTime3);

// verify there are no extra rollback instants
metaClient.reloadActiveTimeline();
rollbackInstants = metaClient.getActiveTimeline().getRollbackTimeline().getInstants().collect(Collectors.toList());
assertEquals(rollbackInstants.size(), 1);
assertEquals(rollbackInstants.get(0), rollbackInstant);
}
}

/**
* Test auto-rollback of commits which are in flight.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common;

import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.table.timeline.HoodieInstant;

/**
* Holds rollback instant and rollback plan for a pending rollback.
*/
public class HoodiePendingRollbackInfo {

private final HoodieInstant rollbackInstant;
private final HoodieRollbackPlan rollbackPlan;

public HoodiePendingRollbackInfo(HoodieInstant rollbackInstant, HoodieRollbackPlan rollbackPlan) {
this.rollbackInstant = rollbackInstant;
this.rollbackPlan = rollbackPlan;
}

public HoodieInstant getRollbackInstant() {
return rollbackInstant;
}

public HoodieRollbackPlan getRollbackPlan() {
return rollbackPlan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ public HoodieTimeline filterPendingReplaceTimeline() {
s -> s.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) && !s.isCompleted()), details);
}

@Override
public HoodieTimeline filterPendingRollbackTimeline() {
return new HoodieDefaultTimeline(instants.stream().filter(
s -> s.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && !s.isCompleted()), details);
}

@Override
public HoodieTimeline filterPendingCompactionTimeline() {
return new HoodieDefaultTimeline(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline filterPendingReplaceTimeline();

/**
* Filter this timeline to include pending rollbacks.
*/
HoodieTimeline filterPendingRollbackTimeline();

/**
* Create a new Timeline with all the instants after startTs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ public static void deleteReplaceCommit(String basePath, String instantTime) thro
removeMetaFile(basePath, instantTime, HoodieTimeline.REPLACE_COMMIT_EXTENSION);
}

public static void deleteRollbackCommit(String basePath, String instantTime) throws IOException {
removeMetaFile(basePath, instantTime, HoodieTimeline.ROLLBACK_EXTENSION);
}

public static long getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime, IOType ioType) throws IOException {
Path parentPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
if (Files.notExists(parentPath)) {
Expand Down