diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java index e7a4170ec7871..87ee7d94723d9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java @@ -18,7 +18,6 @@ package org.apache.hudi.table.action.rollback; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; @@ -27,16 +26,20 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; + +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -111,16 +114,18 @@ protected HoodieRollbackRequest getRollbackRequestForAppend(String markerFilePat // NOTE: Since we're rolling back incomplete Delta Commit, it only could have appended its // block to the latest log-file // TODO(HUDI-1517) use provided marker-file's path instead - HoodieLogFile latestLogFile = FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId, - HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime).get(); - - // NOTE: Marker's don't carry information about the cumulative size of the blocks that have been appended, - // therefore we simply stub this value. - Map logFilesWithBlocsToRollback = - Collections.singletonMap(latestLogFile.getFileStatus().getPath().toString(), -1L); + Option latestLogFileOption = FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId, + HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime); + + Map logFilesWithBlocsToRollback = new HashMap<>(); + if (latestLogFileOption.isPresent()) { + HoodieLogFile latestLogFile = latestLogFileOption.get(); + // NOTE: Marker's don't carry information about the cumulative size of the blocks that have been appended, + // therefore we simply stub this value. + logFilesWithBlocsToRollback = Collections.singletonMap(latestLogFile.getFileStatus().getPath().toString(), -1L); + } return new HoodieRollbackRequest(relativePartitionPath, fileId, baseCommitTime, Collections.emptyList(), logFilesWithBlocsToRollback); } - } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java index fd2af1cdca25a..67623709524ff 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java @@ -80,6 +80,20 @@ public void tearDown() throws Exception { cleanupResources(); } + @Test + public void testMarkerBasedRollbackAppend() throws Exception { + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String f0 = testTable.addRequestedCommit("000") + .getFileIdsWithBaseFilesInPartitions("partA").get("partA"); + testTable.forCommit("001") + .withMarkerFile("partA", f0, IOType.APPEND); + + HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, metaClient); + List rollbackRequests = new MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(), + "002").getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001")); + assertEquals(1, rollbackRequests.size()); + } + @Test public void testCopyOnWriteRollbackWithTestTable() throws Exception { // given: wrote some base files and corresponding markers