Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ public class DatePartitionPathSelector extends DFSPathSelector {
private final String dateFormat;
private final int datePartitionDepth;
private final int numPrevDaysToList;
private final LocalDate fromDate;
private final LocalDate currentDate;
private final int partitionsListParallelism;

/** Configs supported. */
Expand Down Expand Up @@ -107,17 +105,17 @@ public DatePartitionPathSelector(TypedProperties props, Configuration hadoopConf
*/
dateFormat = props.getString(DATE_FORMAT, DEFAULT_DATE_FORMAT);
datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH);
// If not specified the current date is assumed by default.
currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString()));
numPrevDaysToList = props.getInteger(LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS);
fromDate = currentDate.minusDays(numPrevDaysToList);
partitionsListParallelism = props.getInteger(PARTITIONS_LIST_PARALLELISM, DEFAULT_PARTITIONS_LIST_PARALLELISM);
}

@Override
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext,
Option<String> lastCheckpointStr,
long sourceLimit) {
// If not specified the current date is assumed by default.
LocalDate currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString()));

// obtain all eligible files under root folder.
LOG.info(
"Root path => "
Expand All @@ -133,7 +131,7 @@ public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(JavaS
long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
List<String> prunedParitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP));
List<String> prunedParitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP), currentDate);

List<FileStatus> eligibleFiles = context.flatMap(prunedParitionPaths,
path -> {
Expand Down Expand Up @@ -173,7 +171,7 @@ public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(JavaS
* Prunes date level partitions to last few days configured by 'NUM_PREV_DAYS_TO_LIST' from
* 'CURRENT_DATE'. Parallelizes listing by leveraging HoodieSparkEngineContext's methods.
*/
public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath) {
public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath, LocalDate currentDate) {
List<String> partitionPaths = new ArrayList<>();
// get all partition paths before date partition level
partitionPaths.add(rootPath);
Expand All @@ -199,6 +197,7 @@ public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext context, Fi
// Prune date partitions to last few days
return context.getJavaSparkContext().parallelize(partitionPaths, partitionsListParallelism)
.filter(s -> {
LocalDate fromDate = currentDate.minusDays(numPrevDaysToList);
String[] splits = s.split("/");
String datePartition = splits[splits.length - 1];
LocalDate partitionDate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public void testPruneDatePartitionPaths(
createParentDirsBeforeDatePartitions(root, generateRandomStrings(), totalDepthBeforeDatePartitions, leafDirs);
createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat);

List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString());
List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString(), LocalDate.parse(currentDate));
assertEquals(expectedNumFiles, paths.size());
}
}