diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java index 9d1f37abdb53a..4d2f9e4e80630 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java @@ -21,12 +21,13 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.marker.MarkerType; -import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; import java.util.ArrayList; @@ -36,10 +37,19 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.marker.MarkerType.DIRECT; +import static org.apache.hudi.common.table.marker.MarkerType.TIMELINE_SERVER_BASED; +import static org.apache.hudi.common.util.MarkerUtils.MARKER_TYPE_FILENAME; +import static org.apache.hudi.common.util.MarkerUtils.readMarkerType; +import static org.apache.hudi.common.util.MarkerUtils.readTimelineServerBasedMarkersFromFileSystem; + /** * A utility class for marker-based rollback. */ public class MarkerBasedRollbackUtils { + + private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackUtils.class); + /** * Gets all marker paths. * @@ -54,25 +64,35 @@ public static List getAllMarkerPaths(HoodieTable table, HoodieEngineCont String instant, int parallelism) throws IOException { String markerDir = table.getMetaClient().getMarkerFolderPath(instant); FileSystem fileSystem = table.getMetaClient().getFs(); - Option markerTypeOption = MarkerUtils.readMarkerType(fileSystem, markerDir); + Option markerTypeOption = readMarkerType(fileSystem, markerDir); - // If there is no marker type file "MARKERS.type", we assume "DIRECT" markers are used + // If there is no marker type file "MARKERS.type", first assume "DIRECT" markers are used. + // If not, then fallback to "TIMELINE_SERVER_BASED" markers. if (!markerTypeOption.isPresent()) { - WriteMarkers writeMarkers = WriteMarkersFactory.get(MarkerType.DIRECT, table, instant); - return new ArrayList<>(writeMarkers.allMarkerFilePaths()); + WriteMarkers writeMarkers = WriteMarkersFactory.get(DIRECT, table, instant); + try { + return new ArrayList<>(writeMarkers.allMarkerFilePaths()); + } catch (IOException | IllegalArgumentException e) { + LOG.warn(String.format("%s not present and %s marker failed with error: %s. So, falling back to %s marker", + MARKER_TYPE_FILENAME, DIRECT, e.getMessage(), TIMELINE_SERVER_BASED)); + return getTimelineServerBasedMarkers(context, parallelism, markerDir, fileSystem); + } } switch (markerTypeOption.get()) { case TIMELINE_SERVER_BASED: // Reads all markers written by the timeline server - Map> markersMap = - MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( - markerDir, fileSystem, context, parallelism); - return markersMap.values().stream().flatMap(Collection::stream) - .collect(Collectors.toCollection(ArrayList::new)); + return getTimelineServerBasedMarkers(context, parallelism, markerDir, fileSystem); default: throw new HoodieException( "The marker type \"" + markerTypeOption.get().name() + "\" is not supported."); } } + + private static List getTimelineServerBasedMarkers(HoodieEngineContext context, int parallelism, String markerDir, FileSystem fileSystem) { + Map> markersMap = readTimelineServerBasedMarkersFromFileSystem(markerDir, fileSystem, context, parallelism); + return markersMap.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java index 67623709524ff..927f8f3c24b82 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java @@ -36,6 +36,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.rollback.BaseRollbackHelper; import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy; +import org.apache.hudi.table.marker.DirectWriteMarkers; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hadoop.fs.FileStatus; @@ -48,12 +49,16 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; @Tag("functional") public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { @@ -204,4 +209,21 @@ private List testRun(boolean useFileListingMetadata, HoodieW rollbackRequests); } + @Test + public void testMarkerBasedRollbackFallbackToTimelineServerWhenDirectMarkerFails() throws Exception { + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String f0 = testTable.addRequestedCommit("000") + .getFileIdsWithBaseFilesInPartitions("partA").get("partA"); + testTable.forCommit("001") + .withMarkerFile("partA", f0, IOType.APPEND); + + HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, metaClient); + + DirectWriteMarkers writeMarkers = mock(DirectWriteMarkers.class); + initMocks(this); + when(writeMarkers.allMarkerFilePaths()).thenThrow(new IOException("Markers.type file not present")); + MarkerBasedRollbackStrategy rollbackStrategy = new MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(), "002"); + List rollbackRequests = rollbackStrategy.getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001")); + assertEquals(1, rollbackRequests.size()); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java index 555a036b9f834..0aff8f594a5df 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java @@ -64,7 +64,8 @@ public class MarkerUtils { * @return marker file name */ public static String stripMarkerFolderPrefix(String fullMarkerPath, String basePath, String instantTime) { - ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN)); + ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN), + String.format("Using DIRECT markers but marker path does not contain extension: %s", HoodieTableMetaClient.MARKER_EXTN)); String markerRootPath = Path.getPathWithoutSchemeAndAuthority( new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString(); return stripMarkerFolderPrefix(fullMarkerPath, markerRootPath); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index 8ba9abf0eb127..67e850bb7d8fa 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -273,7 +273,7 @@ private void syncMarkersFromFileSystem() { private void writeMarkerTypeToFile() { Path dirPath = new Path(markerDirPath); try { - if (!fileSystem.exists(dirPath)) { + if (!fileSystem.exists(dirPath) || !MarkerUtils.doesMarkerTypeFileExist(fileSystem, markerDirPath)) { // There is no existing marker directory, create a new directory and write marker type fileSystem.mkdirs(dirPath); MarkerUtils.writeMarkerTypeToFile(MarkerType.TIMELINE_SERVER_BASED, fileSystem, markerDirPath);