Skip to content

Commit 56d08fb

Browse files
authored
[HUDI-2351] Extract common FS and IO utils for marker mechanism (apache#3529)
1 parent 57c8113 commit 56d08fb

File tree

8 files changed

+301
-110
lines changed

8 files changed

+301
-110
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java

+1-27
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,9 @@
4040

4141
import java.io.IOException;
4242
import java.util.ArrayList;
43-
import java.util.Arrays;
4443
import java.util.HashSet;
4544
import java.util.List;
4645
import java.util.Set;
47-
import java.util.stream.Collectors;
4846

4947
/**
5048
* Marker operations of directly accessing the file system to create and delete
@@ -74,31 +72,7 @@ public DirectWriteMarkers(HoodieTable table, String instantTime) {
7472
* @param parallelism parallelism for deletion.
7573
*/
7674
public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) {
77-
try {
78-
if (fs.exists(markerDirPath)) {
79-
FileStatus[] fileStatuses = fs.listStatus(markerDirPath);
80-
List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
81-
.map(fileStatus -> fileStatus.getPath().toString())
82-
.collect(Collectors.toList());
83-
84-
if (markerDirSubPaths.size() > 0) {
85-
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
86-
parallelism = Math.min(markerDirSubPaths.size(), parallelism);
87-
context.foreach(markerDirSubPaths, subPathStr -> {
88-
Path subPath = new Path(subPathStr);
89-
FileSystem fileSystem = subPath.getFileSystem(conf.get());
90-
fileSystem.delete(subPath, true);
91-
}, parallelism);
92-
}
93-
94-
boolean result = fs.delete(markerDirPath, true);
95-
LOG.info("Removing marker directory at " + markerDirPath);
96-
return result;
97-
}
98-
} catch (IOException ioe) {
99-
throw new HoodieIOException(ioe.getMessage(), ioe);
100-
}
101-
return false;
75+
return FSUtils.deleteDir(context, fs, markerDirPath, parallelism);
10276
}
10377

10478
/**

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java

+9-18
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,11 @@
3232
import org.apache.hudi.table.HoodieTable;
3333
import org.apache.hudi.table.marker.DirectWriteMarkers;
3434

35-
import com.esotericsoftware.minlog.Log;
3635
import org.apache.hadoop.fs.FileStatus;
3736
import org.apache.hadoop.fs.FileSystem;
3837
import org.apache.hadoop.fs.Path;
3938

4039
import java.io.IOException;
41-
import java.util.Arrays;
4240
import java.util.Collection;
4341
import java.util.Collections;
4442
import java.util.List;
@@ -108,7 +106,8 @@ private void convertToDirectMarkers(final String commitInstantTime,
108106
// Deletes marker type file
109107
MarkerUtils.deleteMarkerTypeFile(fileSystem, markerDir);
110108
// Deletes timeline server based markers
111-
deleteTimelineBasedMarkerFiles(markerDir, fileSystem);
109+
deleteTimelineBasedMarkerFiles(
110+
context, markerDir, fileSystem, table.getConfig().getMarkersDeleteParallelism());
112111
break;
113112
default:
114113
throw new HoodieException("The marker type \"" + markerTypeOption.get().name()
@@ -117,26 +116,18 @@ private void convertToDirectMarkers(final String commitInstantTime,
117116
} else {
118117
// In case of partial failures during downgrade, there is a chance that marker type file was deleted,
119118
// but timeline server based marker files are left. So deletes them if any
120-
deleteTimelineBasedMarkerFiles(markerDir, fileSystem);
119+
deleteTimelineBasedMarkerFiles(
120+
context, markerDir, fileSystem, table.getConfig().getMarkersDeleteParallelism());
121121
}
122122
}
123123

124-
private void deleteTimelineBasedMarkerFiles(String markerDir, FileSystem fileSystem) throws IOException {
124+
private void deleteTimelineBasedMarkerFiles(HoodieEngineContext context, String markerDir,
125+
FileSystem fileSystem, int parallelism) throws IOException {
125126
// Deletes timeline based marker files if any.
126-
Path dirPath = new Path(markerDir);
127-
FileStatus[] fileStatuses = fileSystem.listStatus(dirPath);
128127
Predicate<FileStatus> prefixFilter = fileStatus ->
129128
fileStatus.getPath().getName().startsWith(MARKERS_FILENAME_PREFIX);
130-
List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
131-
.filter(prefixFilter)
132-
.map(fileStatus -> fileStatus.getPath().toString())
133-
.collect(Collectors.toList());
134-
markerDirSubPaths.forEach(fileToDelete -> {
135-
try {
136-
fileSystem.delete(new Path(fileToDelete), false);
137-
} catch (IOException e) {
138-
Log.warn("Deleting Timeline based marker files failed ", e);
139-
}
140-
});
129+
FSUtils.parallelizeSubPathProcess(context, fileSystem, new Path(markerDir), parallelism,
130+
prefixFilter, pairOfSubPathAndConf ->
131+
FSUtils.deleteSubPath(pairOfSubPathAndConf.getKey(), pairOfSubPathAndConf.getValue(), false));
141132
}
142133
}

hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java

+87
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hudi.common.table.timeline.HoodieInstant;
3030
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
3131
import org.apache.hudi.common.util.Option;
32+
import org.apache.hudi.common.util.collection.ImmutablePair;
3233
import org.apache.hudi.common.util.collection.Pair;
3334
import org.apache.hudi.exception.HoodieException;
3435
import org.apache.hudi.exception.HoodieIOException;
@@ -49,8 +50,10 @@
4950
import java.io.File;
5051
import java.io.FileNotFoundException;
5152
import java.io.IOException;
53+
import java.io.Serializable;
5254
import java.util.ArrayList;
5355
import java.util.Arrays;
56+
import java.util.HashMap;
5457
import java.util.HashSet;
5558
import java.util.List;
5659
import java.util.Map;
@@ -59,6 +62,7 @@
5962
import java.util.Set;
6063
import java.util.UUID;
6164
import java.util.function.Function;
65+
import java.util.function.Predicate;
6266
import java.util.regex.Matcher;
6367
import java.util.regex.Pattern;
6468
import java.util.stream.Collectors;
@@ -612,4 +616,87 @@ public static List<FileStatus> getGlobStatusExcludingMetaFolder(FileSystem fs, P
612616
.filter(fileStatus -> !fileStatus.getPath().toString().contains(HoodieTableMetaClient.METAFOLDER_NAME))
613617
.collect(Collectors.toList());
614618
}
619+
620+
/**
621+
* Deletes a directory by deleting sub-paths in parallel on the file system.
622+
*
623+
* @param hoodieEngineContext {@code HoodieEngineContext} instance
624+
* @param fs file system
625+
* @param dirPath directory path
626+
* @param parallelism parallelism to use for sub-paths
627+
* @return {@code true} if the directory is delete; {@code false} otherwise.
628+
*/
629+
public static boolean deleteDir(
630+
HoodieEngineContext hoodieEngineContext, FileSystem fs, Path dirPath, int parallelism) {
631+
try {
632+
if (fs.exists(dirPath)) {
633+
FSUtils.parallelizeSubPathProcess(hoodieEngineContext, fs, dirPath, parallelism, e -> true,
634+
pairOfSubPathAndConf -> deleteSubPath(
635+
pairOfSubPathAndConf.getKey(), pairOfSubPathAndConf.getValue(), true)
636+
);
637+
boolean result = fs.delete(dirPath, false);
638+
LOG.info("Removed directory at " + dirPath);
639+
return result;
640+
}
641+
} catch (IOException ioe) {
642+
throw new HoodieIOException(ioe.getMessage(), ioe);
643+
}
644+
return false;
645+
}
646+
647+
/**
648+
* Processes sub-path in parallel.
649+
*
650+
* @param hoodieEngineContext {@code HoodieEngineContext} instance
651+
* @param fs file system
652+
* @param dirPath directory path
653+
* @param parallelism parallelism to use for sub-paths
654+
* @param subPathPredicate predicate to use to filter sub-paths for processing
655+
* @param pairFunction actual processing logic for each sub-path
656+
* @param <T> type of result to return for each sub-path
657+
* @return a map of sub-path to result of the processing
658+
*/
659+
public static <T> Map<String, T> parallelizeSubPathProcess(
660+
HoodieEngineContext hoodieEngineContext, FileSystem fs, Path dirPath, int parallelism,
661+
Predicate<FileStatus> subPathPredicate, SerializableFunction<Pair<String, SerializableConfiguration>, T> pairFunction) {
662+
Map<String, T> result = new HashMap<>();
663+
try {
664+
FileStatus[] fileStatuses = fs.listStatus(dirPath);
665+
List<String> subPaths = Arrays.stream(fileStatuses)
666+
.filter(subPathPredicate)
667+
.map(fileStatus -> fileStatus.getPath().toString())
668+
.collect(Collectors.toList());
669+
if (subPaths.size() > 0) {
670+
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
671+
int actualParallelism = Math.min(subPaths.size(), parallelism);
672+
result = hoodieEngineContext.mapToPair(subPaths,
673+
subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))),
674+
actualParallelism);
675+
}
676+
} catch (IOException ioe) {
677+
throw new HoodieIOException(ioe.getMessage(), ioe);
678+
}
679+
return result;
680+
}
681+
682+
/**
683+
* Deletes a sub-path.
684+
*
685+
* @param subPathStr sub-path String
686+
* @param conf serializable config
687+
* @param recursive is recursive or not
688+
* @return {@code true} if the sub-path is deleted; {@code false} otherwise.
689+
*/
690+
public static boolean deleteSubPath(String subPathStr, SerializableConfiguration conf, boolean recursive) {
691+
try {
692+
Path subPath = new Path(subPathStr);
693+
FileSystem fileSystem = subPath.getFileSystem(conf.get());
694+
return fileSystem.delete(subPath, recursive);
695+
} catch (IOException e) {
696+
throw new HoodieIOException(e.getMessage(), e);
697+
}
698+
}
699+
700+
public interface SerializableFunction<T, R> extends Function<T, R>, Serializable {
701+
}
615702
}

hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java

+19
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,23 @@
2121
import org.apache.log4j.LogManager;
2222
import org.apache.log4j.Logger;
2323

24+
import java.io.BufferedReader;
2425
import java.io.ByteArrayOutputStream;
2526
import java.io.Closeable;
2627
import java.io.File;
2728
import java.io.FileOutputStream;
2829
import java.io.IOException;
2930
import java.io.InputStream;
31+
import java.io.InputStreamReader;
3032
import java.io.OutputStream;
3133
import java.io.PrintStream;
3234
import java.nio.charset.StandardCharsets;
3335
import java.nio.file.Files;
3436
import java.nio.file.Path;
37+
import java.util.ArrayList;
3538
import java.util.Comparator;
39+
import java.util.List;
40+
import java.util.stream.Collectors;
3641

3742
/**
3843
* Bunch of utility methods for working with files and byte streams.
@@ -71,6 +76,20 @@ public static String readAsUTFString(InputStream input, int length) throws IOExc
7176
return new String(bos.toByteArray(), StandardCharsets.UTF_8);
7277
}
7378

79+
/**
80+
* Reads the input stream into String lines.
81+
*
82+
* @param input {@code InputStream} instance.
83+
* @return String lines in a list.
84+
*/
85+
public static List<String> readAsUTFStringLines(InputStream input) {
86+
List<String> lines = new ArrayList<>();
87+
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
88+
lines = bufferedReader.lines().collect(Collectors.toList());
89+
closeQuietly(bufferedReader);
90+
return lines;
91+
}
92+
7493
public static void copy(InputStream inputStream, OutputStream outputStream) throws IOException {
7594
byte[] buffer = new byte[1024];
7695
int len;

hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java

+31-35
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121

2222
import org.apache.hudi.common.config.SerializableConfiguration;
2323
import org.apache.hudi.common.engine.HoodieEngineContext;
24+
import org.apache.hudi.common.fs.FSUtils;
2425
import org.apache.hudi.common.table.HoodieTableMetaClient;
2526
import org.apache.hudi.common.table.marker.MarkerType;
26-
import org.apache.hudi.common.util.collection.ImmutablePair;
2727
import org.apache.hudi.exception.HoodieException;
2828
import org.apache.hudi.exception.HoodieIOException;
2929

@@ -35,20 +35,15 @@
3535
import org.apache.log4j.LogManager;
3636
import org.apache.log4j.Logger;
3737

38-
import java.io.BufferedReader;
3938
import java.io.BufferedWriter;
4039
import java.io.IOException;
41-
import java.io.InputStreamReader;
4240
import java.io.OutputStreamWriter;
4341
import java.nio.charset.StandardCharsets;
44-
import java.util.Arrays;
4542
import java.util.HashMap;
4643
import java.util.HashSet;
47-
import java.util.List;
4844
import java.util.Map;
4945
import java.util.Set;
5046
import java.util.function.Predicate;
51-
import java.util.stream.Collectors;
5247

5348
import static org.apache.hudi.common.util.FileIOUtils.closeQuietly;
5449

@@ -178,43 +173,44 @@ public static Map<String, Set<String>> readTimelineServerBasedMarkersFromFileSys
178173
Path dirPath = new Path(markerDir);
179174
try {
180175
if (fileSystem.exists(dirPath)) {
181-
FileStatus[] fileStatuses = fileSystem.listStatus(dirPath);
182176
Predicate<FileStatus> prefixFilter = fileStatus ->
183177
fileStatus.getPath().getName().startsWith(MARKERS_FILENAME_PREFIX);
184178
Predicate<FileStatus> markerTypeFilter = fileStatus ->
185179
!fileStatus.getPath().getName().equals(MARKER_TYPE_FILENAME);
186-
List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
187-
.filter(prefixFilter.and(markerTypeFilter))
188-
.map(fileStatus -> fileStatus.getPath().toString())
189-
.collect(Collectors.toList());
190-
191-
if (markerDirSubPaths.size() > 0) {
192-
SerializableConfiguration conf = new SerializableConfiguration(fileSystem.getConf());
193-
int actualParallelism = Math.min(markerDirSubPaths.size(), parallelism);
194-
return context.mapToPair(markerDirSubPaths, markersFilePathStr -> {
195-
Path markersFilePath = new Path(markersFilePathStr);
196-
FileSystem fs = markersFilePath.getFileSystem(conf.get());
197-
FSDataInputStream fsDataInputStream = null;
198-
BufferedReader bufferedReader = null;
199-
Set<String> markers = new HashSet<>();
200-
try {
201-
LOG.debug("Read marker file: " + markersFilePathStr);
202-
fsDataInputStream = fs.open(markersFilePath);
203-
bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
204-
markers = bufferedReader.lines().collect(Collectors.toSet());
205-
} catch (IOException e) {
206-
throw new HoodieIOException("Failed to read file " + markersFilePathStr, e);
207-
} finally {
208-
closeQuietly(bufferedReader);
209-
closeQuietly(fsDataInputStream);
210-
}
211-
return new ImmutablePair<>(markersFilePathStr, markers);
212-
}, actualParallelism);
213-
}
180+
return FSUtils.parallelizeSubPathProcess(
181+
context, fileSystem, dirPath, parallelism, prefixFilter.and(markerTypeFilter),
182+
pairOfSubPathAndConf -> {
183+
String markersFilePathStr = pairOfSubPathAndConf.getKey();
184+
SerializableConfiguration conf = pairOfSubPathAndConf.getValue();
185+
return readMarkersFromFile(new Path(markersFilePathStr), conf);
186+
});
214187
}
215188
return new HashMap<>();
216189
} catch (IOException ioe) {
217190
throw new HoodieIOException(ioe.getMessage(), ioe);
218191
}
219192
}
193+
194+
/**
195+
* Reads the markers stored in the underlying file.
196+
*
197+
* @param markersFilePath file path for the markers
198+
* @param conf serializable config
199+
* @return markers in a {@code Set} of String.
200+
*/
201+
public static Set<String> readMarkersFromFile(Path markersFilePath, SerializableConfiguration conf) {
202+
FSDataInputStream fsDataInputStream = null;
203+
Set<String> markers = new HashSet<>();
204+
try {
205+
LOG.debug("Read marker file: " + markersFilePath);
206+
FileSystem fs = markersFilePath.getFileSystem(conf.get());
207+
fsDataInputStream = fs.open(markersFilePath);
208+
markers = new HashSet<>(FileIOUtils.readAsUTFStringLines(fsDataInputStream));
209+
} catch (IOException e) {
210+
throw new HoodieIOException("Failed to read MARKERS file " + markersFilePath, e);
211+
} finally {
212+
closeQuietly(fsDataInputStream);
213+
}
214+
return markers;
215+
}
220216
}

0 commit comments

Comments
 (0)