diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java index 9a10893b35e89..0aec7c5cec769 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java @@ -32,11 +32,12 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; -import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -80,7 +81,12 @@ public void init() throws Exception { put(DEFAULT_THIRD_PARTITION_PATH, "file-3"); } }; - HoodieTestTable.of(metaClient) + + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath) + .withRollbackUsingMarkers(false) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); + HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create( + metaClient.getHadoopConf(), config, context)) .withPartitionMetaFiles(DEFAULT_PARTITION_PATHS) .addCommit("100") .withBaseFilesInPartitions(partitionAndFileId) @@ -88,11 +94,8 @@ public void init() throws Exception { .withBaseFilesInPartitions(partitionAndFileId) .addInflightCommit("102") .withBaseFilesInPartitions(partitionAndFileId); - // generate two rollback - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath) - .withRollbackUsingMarkers(false) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); + // generate two rollback try (BaseHoodieWriteClient client = new SparkRDDWriteClient(context(), config)) { // Rollback inflight commit3 and commit2 client.rollback("102"); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java index 9d5895de83b17..d44ba5590e938 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -61,13 +61,15 @@ public abstract class BaseRollbackActionExecutor table, - String instantTime, - HoodieInstant instantToRollback, - boolean deleteInstants, - boolean skipLocking) { + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant instantToRollback, + boolean deleteInstants, + boolean skipLocking) { this(context, config, table, instantTime, instantToRollback, deleteInstants, false, config.shouldRollbackUsingMarkers(), skipLocking); } @@ -83,6 +85,7 @@ public BaseRollbackActionExecutor(HoodieEngineContext context, boolean skipLocking) { super(context, config, table, instantTime); this.instantToRollback = instantToRollback; + this.resolvedInstant = instantToRollback; this.deleteInstants = deleteInstants; this.skipTimelinePublish = skipTimelinePublish; this.useMarkerBasedStrategy = useMarkerBasedStrategy; @@ -118,9 +121,7 @@ private HoodieRollbackMetadata runRollback(HoodieTable table, Hoodie Option.of(rollbackTimer.endTimer()), Collections.singletonList(instantToRollback), stats); - if (!skipTimelinePublish) { - finishRollback(inflightInstant, rollbackMetadata); - } + finishRollback(inflightInstant, rollbackMetadata); // Finally, remove the markers post rollback. WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp()) @@ -237,18 +238,32 @@ protected List executeRollback(HoodieInstant instantToRollba } protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException { + boolean enableLocking = (!skipLocking && !skipTimelinePublish); try { - if (!skipLocking) { + if (enableLocking) { this.txnManager.beginTransaction(Option.empty(), Option.empty()); } - writeTableMetadata(rollbackMetadata); - table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant, - TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata)); - LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete"); + + // If publish the rollback to the timeline, we first write the rollback metadata + // to metadata table + if (!skipTimelinePublish) { + writeTableMetadata(rollbackMetadata); + } + + // Then we delete the inflight instant in the data table timeline if enabled + deleteInflightAndRequestedInstant(deleteInstants, table.getActiveTimeline(), resolvedInstant); + + // If publish the rollback to the timeline, we finally transition the inflight rollback + // to complete in the data table timeline + if (!skipTimelinePublish) { + table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant, + TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata)); + LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete"); + } } catch (IOException e) { throw new HoodieIOException("Error executing rollback at instant " + instantTime, e); } finally { - if (!skipLocking) { + if (enableLocking) { this.txnManager.endTransaction(Option.empty()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java index 5e11354303f26..5315ce713eef3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java @@ -67,7 +67,6 @@ protected List executeRollback(HoodieRollbackPlan hoodieRoll List stats = new ArrayList<>(); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - HoodieInstant resolvedInstant = instantToRollback; if (instantToRollback.isCompleted()) { LOG.info("Unpublishing instant " + instantToRollback); @@ -86,8 +85,6 @@ protected List executeRollback(HoodieRollbackPlan hoodieRoll dropBootstrapIndexIfNeeded(instantToRollback); - // Delete Inflight instant if enabled - deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, resolvedInstant); LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer()); return stats; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java index c2b25ffc5bf5a..e4054e9221969 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java @@ -67,7 +67,6 @@ protected List executeRollback(HoodieRollbackPlan hoodieRoll LOG.info("Rolling back instant " + instantToRollback); - HoodieInstant resolvedInstant = instantToRollback; // Atomically un-publish all non-inflight commits if (instantToRollback.isCompleted()) { LOG.info("Un-publishing instant " + instantToRollback + ", deleteInstants=" + deleteInstants); @@ -93,8 +92,6 @@ protected List executeRollback(HoodieRollbackPlan hoodieRoll dropBootstrapIndexIfNeeded(resolvedInstant); - // Delete Inflight instants if enabled - deleteInflightAndRequestedInstant(deleteInstants, table.getActiveTimeline(), resolvedInstant); LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer()); return allRollbackStats; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index 1bee6ac0ac622..e5dd5b087aa23 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -83,10 +83,10 @@ public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile() HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(false).build(); HoodieTable table = this.getHoodieTable(metaClient, writeConfig); HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002"); - + String rollbackInstant = "003"; // execute CopyOnWriteRollbackActionExecutor with filelisting mode BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor = - new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false, + new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, rollbackInstant, needRollBackInstant, false, table.getConfig().shouldRollbackUsingMarkers()); HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get(); CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true, @@ -125,7 +125,9 @@ public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile() assertTrue(testTable.commitExists("001")); assertTrue(testTable.baseFileExists(p1, "001", "id11")); assertTrue(testTable.baseFileExists(p2, "001", "id12")); - assertFalse(testTable.inflightCommitExists("002")); + // Note that executeRollback() does not delete inflight instant files + // The deletion is done in finishRollback() called by runRollback() + assertTrue(testTable.inflightCommitExists("002")); assertFalse(testTable.commitExists("002")); assertFalse(testTable.baseFileExists(p1, "002", "id21")); assertFalse(testTable.baseFileExists(p2, "002", "id22"));