From 155366df398738e0f19ee978a0c355053cbd48c3 Mon Sep 17 00:00:00 2001 From: Ho Tien Vu Date: Thu, 8 Oct 2020 23:19:34 +0800 Subject: [PATCH] [HUDI-1330] handle prefix filtering at directory level The current DFSPathSelector only ignore prefix(_, .) at the file level while files under subdirectories e.g. (.checkpoint/*) are still considered which result in bad-format exception during reading. --- .../sources/helpers/DFSPathSelector.java | 49 ++++++++++--------- .../sources/AbstractDFSSourceTestBase.java | 13 +++++ 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java index 6b58003e843c0..47419e0297550 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java @@ -31,16 +31,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -63,7 +62,7 @@ public static class Config { protected final TypedProperties props; public DFSPathSelector(TypedProperties props, Configuration hadoopConf) { - DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.ROOT_INPUT_PATH_PROP)); this.props = props; this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf); } @@ -101,18 +100,8 @@ public Pair, String> getNextFilePathsAndMaxModificationTime(Optio try { // obtain all eligible files under root folder. log.info("Root path => " + props.getString(Config.ROOT_INPUT_PATH_PROP) + " source limit => " + sourceLimit); - List eligibleFiles = new ArrayList<>(); - RemoteIterator fitr = - fs.listFiles(new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), true); - while (fitr.hasNext()) { - LocatedFileStatus fileStatus = fitr.next(); - if (fileStatus.isDirectory() - || fileStatus.getLen() == 0 - || IGNORE_FILEPREFIX_LIST.stream().anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) { - continue; - } - eligibleFiles.add(fileStatus); - } + long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE); + List eligibleFiles = listEligibleFiles(fs, new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), lastCheckpointTime); // sort them by modification time. eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime)); // Filter based on checkpoint & input size, if needed @@ -120,11 +109,6 @@ public Pair, String> getNextFilePathsAndMaxModificationTime(Optio long maxModificationTime = Long.MIN_VALUE; List filteredFiles = new ArrayList<>(); for (FileStatus f : eligibleFiles) { - if (lastCheckpointStr.isPresent() && f.getModificationTime() <= Long.valueOf(lastCheckpointStr.get()).longValue()) { - // skip processed files - continue; - } - if (currentBytes + f.getLen() >= sourceLimit) { // we have enough data, we are done break; @@ -136,7 +120,7 @@ public Pair, String> getNextFilePathsAndMaxModificationTime(Optio } // no data to read - if (filteredFiles.size() == 0) { + if (filteredFiles.isEmpty()) { return new ImmutablePair<>(Option.empty(), lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE))); } @@ -148,4 +132,25 @@ public Pair, String> getNextFilePathsAndMaxModificationTime(Optio throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe); } } + + /** + * List files recursively, filter out illegible files/directories while doing so. + */ + private List listEligibleFiles(FileSystem fs, Path path, long lastCheckpointTime) throws IOException { + // skip files/dirs whose names start with (_, ., etc) + FileStatus[] statuses = fs.listStatus(path, file -> + IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith(pfx))); + List res = new ArrayList<>(); + for (FileStatus status: statuses) { + if (status.isDirectory()) { + // avoid infinite loop + if (!status.isSymlink()) { + res.addAll(listEligibleFiles(fs, status.getPath(), lastCheckpointTime)); + } + } else if (status.getModificationTime() > lastCheckpointTime && status.getLen() > 0) { + res.add(status); + } + } + return res; + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java index f63f3e96b6e76..e02d00caec752 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java @@ -175,5 +175,18 @@ public void testReadingFromSource() throws IOException { InputBatch> fetch5 = sourceFormatAdapter.fetchNewDataInAvroFormat( Option.empty(), Long.MAX_VALUE); assertEquals(10100, fetch5.getBatch().get().count()); + + // 6. Should skip files/directories whose names start with prefixes ("_", ".") + generateOneFile(".checkpoint/3", "002", 100); + generateOneFile("_checkpoint/3", "002", 100); + generateOneFile(".3", "002", 100); + generateOneFile("_3", "002", 100); + // also work with nested directory + generateOneFile("foo/.bar/3", "002", 1); // not ok + generateOneFile("foo/bar/3", "002", 1); // ok + // fetch everything from the beginning + InputBatch> fetch6 = sourceFormatAdapter.fetchNewDataInAvroFormat( + Option.empty(), Long.MAX_VALUE); + assertEquals(10101, fetch6.getBatch().get().count()); } }