Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void init() throws IOException {

// archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
archiveLog.archiveIfRequired();
archiveLog.archiveIfRequired(jsc);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void testShowArchivedCommits() throws IOException {
// archive
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, jsc.hadoopConfiguration());
archiveLog.archiveIfRequired();
archiveLog.archiveIfRequired(jsc);

CommandResult cr = getShell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104"));
assertTrue(cr.isSuccess());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ protected void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, S
try {

// Delete the marker directory for the instant.
new MarkerFiles(table, instantTime).quietDeleteMarkerDir();
new MarkerFiles(table, instantTime).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism());

// Do an inline compaction if enabled
if (config.isInlineCompaction()) {
Expand All @@ -349,7 +349,7 @@ protected void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, S
}
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, hadoopConf);
archiveLog.archiveIfRequired();
archiveLog.archiveIfRequired(jsc);
autoCleanOnCommit(instantTime);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
public static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
public static final String MARKERS_DELETE_PARALLELISM = "hoodie.markers.delete.parallelism";
public static final String DEFAULT_MARKERS_DELETE_PARALLELISM = "100";
public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode";
public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT
.toString();
Expand Down Expand Up @@ -235,6 +237,10 @@ public int getFinalizeWriteParallelism() {
return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
}

public int getMarkersDeleteParallelism() {
return Integer.parseInt(props.getProperty(MARKERS_DELETE_PARALLELISM));
}

public boolean isEmbeddedTimelineServerEnabled() {
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
}
Expand Down Expand Up @@ -830,6 +836,11 @@ public Builder withFinalizeWriteParallelism(int parallelism) {
return this;
}

public Builder withMarkersDeleteParallelism(int parallelism) {
props.setProperty(MARKERS_DELETE_PARALLELISM, String.valueOf(parallelism));
return this;
}

public Builder withEmbeddedTimelineServerEnabled(boolean enabled) {
props.setProperty(EMBEDDED_TIMELINE_SERVER_ENABLED, String.valueOf(enabled));
return this;
Expand Down Expand Up @@ -874,6 +885,8 @@ public HoodieWriteConfig build() {
DEFAULT_HOODIE_WRITE_STATUS_CLASS);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM,
DEFAULT_FINALIZE_WRITE_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(MARKERS_DELETE_PARALLELISM), MARKERS_DELETE_PARALLELISM,
DEFAULT_MARKERS_DELETE_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
Expand Down
17 changes: 9 additions & 8 deletions hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -428,21 +429,21 @@ protected void reconcileAgainstMarkers(JavaSparkContext jsc,
}

// we are not including log appends here, since they are already fail-safe.
List<String> invalidDataPaths = markers.createdAndMergedDataPaths();
List<String> validDataPaths = stats.stream()
Set<String> invalidDataPaths = markers.createdAndMergedDataPaths(jsc, config.getFinalizeWriteParallelism());
Set<String> validDataPaths = stats.stream()
.map(HoodieWriteStat::getPath)
.filter(p -> p.endsWith(this.getBaseFileExtension()))
.collect(Collectors.toList());
.collect(Collectors.toSet());

// Contains list of partially created files. These needs to be cleaned up.
invalidDataPaths.removeAll(validDataPaths);

if (!invalidDataPaths.isEmpty()) {
LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths);
}
Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
.map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString()))
.collect(Collectors.groupingBy(Pair::getKey));
Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
.map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString()))
.collect(Collectors.groupingBy(Pair::getKey));

if (!invalidPathsByPartition.isEmpty()) {
// Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS.
// Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit
if (consistencyCheckEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -121,15 +122,15 @@ private void close() {
/**
* Check if commits need to be archived. If yes, archive commits.
*/
public boolean archiveIfRequired() throws IOException {
public boolean archiveIfRequired(JavaSparkContext jsc) throws IOException {
try {
List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());

boolean success = true;
if (!instantsToArchive.isEmpty()) {
this.writer = openWriter();
LOG.info("Archiving instants " + instantsToArchive);
archive(instantsToArchive);
archive(jsc, instantsToArchive);
LOG.info("Deleting archived instants " + instantsToArchive);
success = deleteArchivedInstants(instantsToArchive);
} else {
Expand Down Expand Up @@ -267,15 +268,15 @@ private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thre
return success;
}

public void archive(List<HoodieInstant> instants) throws HoodieCommitException {
public void archive(JavaSparkContext jsc, List<HoodieInstant> instants) throws HoodieCommitException {
try {
HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
LOG.info("Wrapper schema " + wrapperSchema.toString());
List<IndexedRecord> records = new ArrayList<>();
for (HoodieInstant hoodieInstant : instants) {
try {
deleteAnyLeftOverMarkerFiles(hoodieInstant);
deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant);
records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
Expand All @@ -293,9 +294,9 @@ public void archive(List<HoodieInstant> instants) throws HoodieCommitException {
}
}

private void deleteAnyLeftOverMarkerFiles(HoodieInstant instant) {
private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant instant) {
MarkerFiles markerFiles = new MarkerFiles(table, instant.getTimestamp());
if (markerFiles.deleteMarkerDir()) {
if (markerFiles.deleteMarkerDir(jsc, config.getMarkersDeleteParallelism())) {
LOG.info("Cleaned up left over marker directory for instant :" + instant);
}
}
Expand Down
100 changes: 76 additions & 24 deletions hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

package org.apache.hudi.table;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
Expand All @@ -28,26 +32,27 @@
import org.apache.hudi.io.IOType;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Operates on marker files for a given write action (commit, delta commit, compaction).
*/
public class MarkerFiles {
public class MarkerFiles implements Serializable {

private static final Logger LOG = LogManager.getLogger(MarkerFiles.class);

public static String stripMarkerSuffix(String path) {
return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN));
}

private final String instantTime;
private final FileSystem fs;
private final Path markerDirPath;
private final transient FileSystem fs;
private final transient Path markerDirPath;
private final String basePath;

public MarkerFiles(FileSystem fs, String basePath, String markerFolderPath, String instantTime) {
Expand All @@ -64,44 +69,87 @@ public MarkerFiles(HoodieTable<?> table, String instantTime) {
instantTime);
}

public void quietDeleteMarkerDir() {
public void quietDeleteMarkerDir(JavaSparkContext jsc, int parallelism) {
try {
deleteMarkerDir();
deleteMarkerDir(jsc, parallelism);
} catch (HoodieIOException ioe) {
LOG.warn("Error deleting marker directory for instant " + instantTime, ioe);
}
}

/**
* Delete Marker directory corresponding to an instant.
*
* @param jsc Java Spark Context.
* @param parallelism Spark parallelism for deletion.
*/
public boolean deleteMarkerDir() {
public boolean deleteMarkerDir(JavaSparkContext jsc, int parallelism) {
try {
boolean result = fs.delete(markerDirPath, true);
if (result) {
if (fs.exists(markerDirPath)) {
FileStatus[] fileStatuses = fs.listStatus(markerDirPath);
List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
.map(fileStatus -> fileStatus.getPath().toString())
.collect(Collectors.toList());

if (markerDirSubPaths.size() > 0) {
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
parallelism = Math.min(markerDirSubPaths.size(), parallelism);
jsc.parallelize(markerDirSubPaths, parallelism).foreach(subPathStr -> {
Path subPath = new Path(subPathStr);
FileSystem fileSystem = subPath.getFileSystem(conf.get());
fileSystem.delete(subPath, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: this will still work when subPath is a file. i.e non-partitioned tables

});
}

boolean result = fs.delete(markerDirPath, true);
LOG.info("Removing marker directory at " + markerDirPath);
} else {
LOG.info("No marker directory to delete at " + markerDirPath);
return result;
}
return result;
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
return false;
}

public boolean doesMarkerDirExist() throws IOException {
return fs.exists(markerDirPath);
}

public List<String> createdAndMergedDataPaths() throws IOException {
List<String> dataFiles = new LinkedList<>();
FSUtils.processFiles(fs, markerDirPath.toString(), (status) -> {
String pathStr = status.getPath().toString();
if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
dataFiles.add(translateMarkerToDataPath(pathStr));
public Set<String> createdAndMergedDataPaths(JavaSparkContext jsc, int parallelism) throws IOException {
Set<String> dataFiles = new HashSet<>();

FileStatus[] topLevelStatuses = fs.listStatus(markerDirPath);
List<String> subDirectories = new ArrayList<>();
for (FileStatus topLevelStatus: topLevelStatuses) {
if (topLevelStatus.isFile()) {
String pathStr = topLevelStatus.getPath().toString();
if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
dataFiles.add(translateMarkerToDataPath(pathStr));
}
} else {
subDirectories.add(topLevelStatus.getPath().toString());
}
return true;
}, false);
}

if (subDirectories.size() > 0) {
parallelism = Math.min(subDirectories.size(), parallelism);
SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
dataFiles.addAll(jsc.parallelize(subDirectories, parallelism).flatMap(directory -> {
Path path = new Path(directory);
FileSystem fileSystem = path.getFileSystem(serializedConf.get());
RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, true);
List<String> result = new ArrayList<>();
while (itr.hasNext()) {
FileStatus status = itr.next();
String pathStr = status.getPath().toString();
if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
result.add(translateMarkerToDataPath(pathStr));
}
}
return result.iterator();
}).collect());
}

return dataFiles;
}

Expand All @@ -110,6 +158,10 @@ private String translateMarkerToDataPath(String markerPath) {
return MarkerFiles.stripMarkerSuffix(rPath);
}

public static String stripMarkerSuffix(String path) {
return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN));
}

public List<String> allMarkerFilePaths() throws IOException {
List<String> markerFiles = new ArrayList<>();
FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public HoodieRollbackMetadata execute() {
}

// Finally, remove the marker files post rollback.
new MarkerFiles(table, instantToRollback.getTimestamp()).quietDeleteMarkerDir();
new MarkerFiles(table, instantToRollback.getTimestamp()).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism());

return rollbackMetadata;
}
Expand Down
Loading