Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -36,10 +37,19 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.marker.MarkerType.DIRECT;
import static org.apache.hudi.common.table.marker.MarkerType.TIMELINE_SERVER_BASED;
import static org.apache.hudi.common.util.MarkerUtils.MARKER_TYPE_FILENAME;
import static org.apache.hudi.common.util.MarkerUtils.readMarkerType;
import static org.apache.hudi.common.util.MarkerUtils.readTimelineServerBasedMarkersFromFileSystem;

/**
* A utility class for marker-based rollback.
*/
public class MarkerBasedRollbackUtils {

private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackUtils.class);

/**
* Gets all marker paths.
*
Expand All @@ -54,25 +64,35 @@ public static List<String> getAllMarkerPaths(HoodieTable table, HoodieEngineCont
String instant, int parallelism) throws IOException {
String markerDir = table.getMetaClient().getMarkerFolderPath(instant);
FileSystem fileSystem = table.getMetaClient().getFs();
Option<MarkerType> markerTypeOption = MarkerUtils.readMarkerType(fileSystem, markerDir);
Option<MarkerType> markerTypeOption = readMarkerType(fileSystem, markerDir);

// If there is no marker type file "MARKERS.type", we assume "DIRECT" markers are used
// If there is no marker type file "MARKERS.type", first assume "DIRECT" markers are used.
// If not, then fallback to "TIMELINE_SERVER_BASED" markers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I inspected the code again to see if there will be a chance where marker type will be missing in timeline server based markers. looks like we first create the marker type file before creating any markers. may be just the directory will exists w/o any marker type file and w/o any makers. But there should not be a case where marker type is missing and there are valid markers found incase of TimelineBasedMarkers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is one flow where we could end up in this situation, but I could not piece end to end fully together.
Lets say, rollback was triggered using timeline based markers. Towards the end, when everything is complete, we go ahead and delete the marker directory. During this, lets say marker type file was deleted, but before deleting all markers, process crashed. We might end up in this state. But from looking at the code, rollback is actually complete by now (i.e. original commit files would have been deleted, and infact rollback commit would also be completed by now. So, a failure during deletion of marker directory should not have any impact. rollback will never be re-triggered for this case.

Let me know if you guys can think of a scenario where this could happen. would really love to find the root cause.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's possible. However, it was reported in #5591 that the rollback failed.
The other thing is that the creation of marker type file is contingent on existence of marker dir. What if the first time marker dir got created but marker type file was not created, then on retry MarkerDirState#writeMarkerTypeToFile sees that the marker dr already exists so it skips marker type file creation and sets isMarkerTypeWritten to true and goes ahead with writing marker files. For such cases, we need to explicitly check whether marker type file exists or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense. thanks for finding the potential root cause.

if (!markerTypeOption.isPresent()) {
WriteMarkers writeMarkers = WriteMarkersFactory.get(MarkerType.DIRECT, table, instant);
return new ArrayList<>(writeMarkers.allMarkerFilePaths());
WriteMarkers writeMarkers = WriteMarkersFactory.get(DIRECT, table, instant);
try {
return new ArrayList<>(writeMarkers.allMarkerFilePaths());
} catch (IOException | IllegalArgumentException e) {
LOG.warn(String.format("%s not present and %s marker failed with error: %s. So, falling back to %s marker",
MARKER_TYPE_FILENAME, DIRECT, e.getMessage(), TIMELINE_SERVER_BASED));
return getTimelineServerBasedMarkers(context, parallelism, markerDir, fileSystem);
}
}

switch (markerTypeOption.get()) {
case TIMELINE_SERVER_BASED:
// Reads all markers written by the timeline server
Map<String, Set<String>> markersMap =
MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(
markerDir, fileSystem, context, parallelism);
return markersMap.values().stream().flatMap(Collection::stream)
.collect(Collectors.toCollection(ArrayList::new));
return getTimelineServerBasedMarkers(context, parallelism, markerDir, fileSystem);
default:
throw new HoodieException(
"The marker type \"" + markerTypeOption.get().name() + "\" is not supported.");
}
}

private static List<String> getTimelineServerBasedMarkers(HoodieEngineContext context, int parallelism, String markerDir, FileSystem fileSystem) {
Map<String, Set<String>> markersMap = readTimelineServerBasedMarkersFromFileSystem(markerDir, fileSystem, context, parallelism);
return markersMap.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy;
import org.apache.hudi.table.marker.DirectWriteMarkers;
import org.apache.hudi.testutils.HoodieClientTestBase;

import org.apache.hadoop.fs.FileStatus;
Expand All @@ -48,12 +49,16 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

@Tag("functional")
public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
Expand Down Expand Up @@ -204,4 +209,21 @@ private List<HoodieRollbackStat> testRun(boolean useFileListingMetadata, HoodieW
rollbackRequests);
}

@Test
public void testMarkerBasedRollbackFallbackToTimelineServerWhenDirectMarkerFails() throws Exception {
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String f0 = testTable.addRequestedCommit("000")
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
testTable.forCommit("001")
.withMarkerFile("partA", f0, IOType.APPEND);

HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, metaClient);

DirectWriteMarkers writeMarkers = mock(DirectWriteMarkers.class);
initMocks(this);
when(writeMarkers.allMarkerFilePaths()).thenThrow(new IOException("Markers.type file not present"));
MarkerBasedRollbackStrategy rollbackStrategy = new MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(), "002");
List<HoodieRollbackRequest> rollbackRequests = rollbackStrategy.getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
assertEquals(1, rollbackRequests.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public class MarkerUtils {
* @return marker file name
*/
public static String stripMarkerFolderPrefix(String fullMarkerPath, String basePath, String instantTime) {
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN),
String.format("Using DIRECT markers but marker path does not contain extension: %s", HoodieTableMetaClient.MARKER_EXTN));
String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
return stripMarkerFolderPrefix(fullMarkerPath, markerRootPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ private void syncMarkersFromFileSystem() {
private void writeMarkerTypeToFile() {
Path dirPath = new Path(markerDirPath);
try {
if (!fileSystem.exists(dirPath)) {
if (!fileSystem.exists(dirPath) || !MarkerUtils.doesMarkerTypeFileExist(fileSystem, markerDirPath)) {
// There is no existing marker directory, create a new directory and write marker type
fileSystem.mkdirs(dirPath);
MarkerUtils.writeMarkerTypeToFile(MarkerType.TIMELINE_SERVER_BASED, fileSystem, markerDirPath);
Expand Down