From 236a11947fef3cc2172360721a053ae1d187f1cc Mon Sep 17 00:00:00 2001 From: akanungo Date: Fri, 12 Mar 2021 09:15:04 -0800 Subject: [PATCH 1/2] keep updating current date for every batch --- .../sources/helpers/DatePartitionPathSelector.java | 13 ++++++------- .../helpers/TestDatePartitionPathSelector.java | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java index c22657f5815c4..79f08743639d6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java @@ -74,8 +74,6 @@ public class DatePartitionPathSelector extends DFSPathSelector { private final String dateFormat; private final int datePartitionDepth; private final int numPrevDaysToList; - private final LocalDate fromDate; - private final LocalDate currentDate; private final int partitionsListParallelism; /** Configs supported. */ @@ -107,10 +105,7 @@ public DatePartitionPathSelector(TypedProperties props, Configuration hadoopConf */ dateFormat = props.getString(DATE_FORMAT, DEFAULT_DATE_FORMAT); datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH); - // If not specified the current date is assumed by default. - currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString())); numPrevDaysToList = props.getInteger(LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS); - fromDate = currentDate.minusDays(numPrevDaysToList); partitionsListParallelism = props.getInteger(PARTITIONS_LIST_PARALLELISM, DEFAULT_PARTITIONS_LIST_PARALLELISM); } @@ -118,6 +113,9 @@ public DatePartitionPathSelector(TypedProperties props, Configuration hadoopConf public Pair, String> getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext, Option lastCheckpointStr, long sourceLimit) { + // If not specified the current date is assumed by default. + LocalDate currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString())); + // obtain all eligible files under root folder. LOG.info( "Root path => " @@ -133,7 +131,7 @@ public Pair, String> getNextFilePathsAndMaxModificationTime(JavaS long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE); HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext); SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf()); - List prunedParitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP)); + List prunedParitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP), currentDate); List eligibleFiles = context.flatMap(prunedParitionPaths, path -> { @@ -173,7 +171,8 @@ public Pair, String> getNextFilePathsAndMaxModificationTime(JavaS * Prunes date level partitions to last few days configured by 'NUM_PREV_DAYS_TO_LIST' from * 'CURRENT_DATE'. Parallelizes listing by leveraging HoodieSparkEngineContext's methods. */ - public List pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath) { + public List pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath, LocalDate currentDate) { + LocalDate fromDate = currentDate.minusDays(numPrevDaysToList); List partitionPaths = new ArrayList<>(); // get all partition paths before date partition level partitionPaths.add(rootPath); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java index 30d099323d853..16736604d72d2 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java @@ -210,7 +210,7 @@ public void testPruneDatePartitionPaths( createParentDirsBeforeDatePartitions(root, generateRandomStrings(), totalDepthBeforeDatePartitions, leafDirs); createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat); - List paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString()); + List paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString(), LocalDate.parse(currentDate)); assertEquals(expectedNumFiles, paths.size()); } } From e3dff4dbef95571cb86aeb38848cee911d3451ad Mon Sep 17 00:00:00 2001 From: akanungo Date: Fri, 12 Mar 2021 12:38:14 -0800 Subject: [PATCH 2/2] minor refactoring --- .../utilities/sources/helpers/DatePartitionPathSelector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java index 79f08743639d6..97106ded9bcb8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java @@ -172,7 +172,6 @@ public Pair, String> getNextFilePathsAndMaxModificationTime(JavaS * 'CURRENT_DATE'. Parallelizes listing by leveraging HoodieSparkEngineContext's methods. */ public List pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath, LocalDate currentDate) { - LocalDate fromDate = currentDate.minusDays(numPrevDaysToList); List partitionPaths = new ArrayList<>(); // get all partition paths before date partition level partitionPaths.add(rootPath); @@ -198,6 +197,7 @@ public List pruneDatePartitionPaths(HoodieSparkEngineContext context, Fi // Prune date partitions to last few days return context.getJavaSparkContext().parallelize(partitionPaths, partitionsListParallelism) .filter(s -> { + LocalDate fromDate = currentDate.minusDays(numPrevDaysToList); String[] splits = s.split("/"); String datePartition = splits[splits.length - 1]; LocalDate partitionDate;