Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -80,19 +81,21 @@ public void init() throws Exception {
put(DEFAULT_THIRD_PARTITION_PATH, "file-3");
}
};
HoodieTestTable.of(metaClient)

HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create(
metaClient.getHadoopConf(), config, context))
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
.addCommit("100")
.withBaseFilesInPartitions(partitionAndFileId)
.addCommit("101")
.withBaseFilesInPartitions(partitionAndFileId)
.addInflightCommit("102")
.withBaseFilesInPartitions(partitionAndFileId);
// generate two rollback
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();

// generate two rollback
try (BaseHoodieWriteClient client = new SparkRDDWriteClient(context(), config)) {
// Rollback inflight commit3 and commit2
client.rollback("102");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
private final TransactionManager txnManager;
private final boolean skipLocking;

protected HoodieInstant resolvedInstant;

public BaseRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant instantToRollback,
boolean deleteInstants,
boolean skipLocking) {
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant instantToRollback,
boolean deleteInstants,
boolean skipLocking) {
this(context, config, table, instantTime, instantToRollback, deleteInstants,
false, config.shouldRollbackUsingMarkers(), skipLocking);
}
Expand All @@ -83,6 +85,7 @@ public BaseRollbackActionExecutor(HoodieEngineContext context,
boolean skipLocking) {
super(context, config, table, instantTime);
this.instantToRollback = instantToRollback;
this.resolvedInstant = instantToRollback;
this.deleteInstants = deleteInstants;
this.skipTimelinePublish = skipTimelinePublish;
this.useMarkerBasedStrategy = useMarkerBasedStrategy;
Expand Down Expand Up @@ -118,9 +121,7 @@ private HoodieRollbackMetadata runRollback(HoodieTable<T, I, K, O> table, Hoodie
Option.of(rollbackTimer.endTimer()),
Collections.singletonList(instantToRollback),
stats);
if (!skipTimelinePublish) {
finishRollback(inflightInstant, rollbackMetadata);
}
finishRollback(inflightInstant, rollbackMetadata);

// Finally, remove the markers post rollback.
WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp())
Expand Down Expand Up @@ -237,18 +238,32 @@ protected List<HoodieRollbackStat> executeRollback(HoodieInstant instantToRollba
}

protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException {
boolean enableLocking = (!skipLocking && !skipTimelinePublish);
try {
if (!skipLocking) {
if (enableLocking) {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
}
writeTableMetadata(rollbackMetadata);
table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant,
TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete");

// If publish the rollback to the timeline, we first write the rollback metadata
// to metadata table
if (!skipTimelinePublish) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should see if we can make this more readable or nicer. I will approve it for now. but see if we can improve this sometime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I'll refactor this later on.

writeTableMetadata(rollbackMetadata);
}

// Then we delete the inflight instant in the data table timeline if enabled
deleteInflightAndRequestedInstant(deleteInstants, table.getActiveTimeline(), resolvedInstant);

// If publish the rollback to the timeline, we finally transition the inflight rollback
// to complete in the data table timeline
if (!skipTimelinePublish) {
table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant,
TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete");
}
} catch (IOException e) {
throw new HoodieIOException("Error executing rollback at instant " + instantTime, e);
} finally {
if (!skipLocking) {
if (enableLocking) {
this.txnManager.endTransaction(Option.empty());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ protected List<HoodieRollbackStat> executeRollback(HoodieRollbackPlan hoodieRoll

List<HoodieRollbackStat> stats = new ArrayList<>();
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieInstant resolvedInstant = instantToRollback;

if (instantToRollback.isCompleted()) {
LOG.info("Unpublishing instant " + instantToRollback);
Expand All @@ -86,8 +85,6 @@ protected List<HoodieRollbackStat> executeRollback(HoodieRollbackPlan hoodieRoll

dropBootstrapIndexIfNeeded(instantToRollback);

// Delete Inflight instant if enabled
deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, resolvedInstant);
LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
return stats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ protected List<HoodieRollbackStat> executeRollback(HoodieRollbackPlan hoodieRoll

LOG.info("Rolling back instant " + instantToRollback);

HoodieInstant resolvedInstant = instantToRollback;
// Atomically un-publish all non-inflight commits
if (instantToRollback.isCompleted()) {
LOG.info("Un-publishing instant " + instantToRollback + ", deleteInstants=" + deleteInstants);
Expand All @@ -93,8 +92,6 @@ protected List<HoodieRollbackStat> executeRollback(HoodieRollbackPlan hoodieRoll

dropBootstrapIndexIfNeeded(resolvedInstant);

// Delete Inflight instants if enabled
deleteInflightAndRequestedInstant(deleteInstants, table.getActiveTimeline(), resolvedInstant);
LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
return allRollbackStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile()
HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(false).build();
HoodieTable table = this.getHoodieTable(metaClient, writeConfig);
HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002");

String rollbackInstant = "003";
// execute CopyOnWriteRollbackActionExecutor with filelisting mode
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false,
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, rollbackInstant, needRollBackInstant, false,
table.getConfig().shouldRollbackUsingMarkers());
HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true,
Expand Down Expand Up @@ -125,7 +125,9 @@ public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile()
assertTrue(testTable.commitExists("001"));
assertTrue(testTable.baseFileExists(p1, "001", "id11"));
assertTrue(testTable.baseFileExists(p2, "001", "id12"));
assertFalse(testTable.inflightCommitExists("002"));
// Note that executeRollback() does not delete inflight instant files
// The deletion is done in finishRollback() called by runRollback()
assertTrue(testTable.inflightCommitExists("002"));
assertFalse(testTable.commitExists("002"));
assertFalse(testTable.baseFileExists(p1, "002", "id21"));
assertFalse(testTable.baseFileExists(p2, "002", "id22"));
Expand Down