Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -63,7 +62,7 @@ public static class Config {
protected final TypedProperties props;

public DFSPathSelector(TypedProperties props, Configuration hadoopConf) {
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.ROOT_INPUT_PATH_PROP));
this.props = props;
this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf);
}
Expand Down Expand Up @@ -101,30 +100,15 @@ public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Optio
try {
// obtain all eligible files under root folder.
log.info("Root path => " + props.getString(Config.ROOT_INPUT_PATH_PROP) + " source limit => " + sourceLimit);
List<FileStatus> eligibleFiles = new ArrayList<>();
RemoteIterator<LocatedFileStatus> fitr =
fs.listFiles(new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), true);
while (fitr.hasNext()) {
LocatedFileStatus fileStatus = fitr.next();
if (fileStatus.isDirectory()
|| fileStatus.getLen() == 0
|| IGNORE_FILEPREFIX_LIST.stream().anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
continue;
}
eligibleFiles.add(fileStatus);
}
long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
List<FileStatus> eligibleFiles = listEligibleFiles(fs, new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), lastCheckpointTime);
// sort them by modification time.
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
// Filter based on checkpoint & input size, if needed
long currentBytes = 0;
long maxModificationTime = Long.MIN_VALUE;
List<FileStatus> filteredFiles = new ArrayList<>();
for (FileStatus f : eligibleFiles) {
if (lastCheckpointStr.isPresent() && f.getModificationTime() <= Long.valueOf(lastCheckpointStr.get()).longValue()) {
// skip processed files
continue;
}

if (currentBytes + f.getLen() >= sourceLimit) {
// we have enough data, we are done
break;
Expand All @@ -136,7 +120,7 @@ public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Optio
}

// no data to read
if (filteredFiles.size() == 0) {
if (filteredFiles.isEmpty()) {
return new ImmutablePair<>(Option.empty(), lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
}

Expand All @@ -148,4 +132,25 @@ public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Optio
throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
}
}

/**
* List files recursively, filter out illegible files/directories while doing so.
*/
private List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long lastCheckpointTime) throws IOException {
// skip files/dirs whose names start with (_, ., etc)
FileStatus[] statuses = fs.listStatus(path, file ->
IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith(pfx)));
List<FileStatus> res = new ArrayList<>();
for (FileStatus status: statuses) {
if (status.isDirectory()) {
// avoid infinite loop
if (!status.isSymlink()) {
res.addAll(listEligibleFiles(fs, status.getPath(), lastCheckpointTime));
}
} else if (status.getModificationTime() > lastCheckpointTime && status.getLen() > 0) {
res.add(status);
}
}
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,5 +175,18 @@ public void testReadingFromSource() throws IOException {
InputBatch<JavaRDD<GenericRecord>> fetch5 = sourceFormatAdapter.fetchNewDataInAvroFormat(
Option.empty(), Long.MAX_VALUE);
assertEquals(10100, fetch5.getBatch().get().count());

// 6. Should skip files/directories whose names start with prefixes ("_", ".")
generateOneFile(".checkpoint/3", "002", 100);
generateOneFile("_checkpoint/3", "002", 100);
generateOneFile(".3", "002", 100);
generateOneFile("_3", "002", 100);
// also work with nested directory
generateOneFile("foo/.bar/3", "002", 1); // not ok
generateOneFile("foo/bar/3", "002", 1); // ok
// fetch everything from the beginning
InputBatch<JavaRDD<GenericRecord>> fetch6 = sourceFormatAdapter.fetchNewDataInAvroFormat(
Option.empty(), Long.MAX_VALUE);
assertEquals(10101, fetch6.getBatch().get().count());
}
}