diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java index 1152cd65cabcd..aed6c6b7358b0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java @@ -52,7 +52,7 @@ public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, Spark @Override protected InputBatch> fetchNewData(Option lastCkptStr, long sourceLimit) { Pair, String> selectPathsWithMaxModificationTime = - pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit); + pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit); return selectPathsWithMaxModificationTime.getLeft() .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight())) .orElseGet(() -> new InputBatch<>(Option.empty(), selectPathsWithMaxModificationTime.getRight())); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java index dc40b47dd16cf..1ce50b2627b33 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java @@ -92,7 +92,7 @@ public CsvDFSSource(TypedProperties props, protected Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { Pair, String> selPathsWithMaxModificationTime = - pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit); + pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit); return Pair.of(fromFiles( selPathsWithMaxModificationTime.getLeft()), selPathsWithMaxModificationTime.getRight()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java index d34289daa0942..64da4f4f50f5d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java @@ -44,7 +44,7 @@ public JsonDFSSource(TypedProperties props, JavaSparkContext sparkContext, Spark @Override protected InputBatch> fetchNewData(Option lastCkptStr, long sourceLimit) { Pair, String> selPathsWithMaxModificationTime = - pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit); + pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit); return selPathsWithMaxModificationTime.getLeft() .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selPathsWithMaxModificationTime.getRight())) .orElse(new InputBatch<>(Option.empty(), selPathsWithMaxModificationTime.getRight())); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java index 55d2de2d4c360..a56a878f1fe73 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java @@ -45,7 +45,7 @@ public ParquetDFSSource(TypedProperties props, JavaSparkContext sparkContext, Sp @Override public Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { Pair, String> selectPathsWithMaxModificationTime = - pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit); + pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit); return selectPathsWithMaxModificationTime.getLeft() .map(pathStr -> Pair.of(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight())) .orElseGet(() -> Pair.of(Option.empty(), selectPathsWithMaxModificationTime.getRight())); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java index 47419e0297550..d9d3444bf0e97 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java @@ -34,8 +34,10 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; +import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -43,7 +45,7 @@ import java.util.List; import java.util.stream.Collectors; -public class DFSPathSelector { +public class DFSPathSelector implements Serializable { protected static volatile Logger log = LogManager.getLogger(DFSPathSelector.class); @@ -90,13 +92,26 @@ public static DFSPathSelector createSourceSelector(TypedProperties props, /** * Get the list of files changed since last checkpoint. * + * @param sparkContext JavaSparkContext to help parallelize certain operations * @param lastCheckpointStr the last checkpoint time string, empty if first run * @param sourceLimit max bytes to read each time * @return the list of files concatenated and their latest modified time */ - public Pair, String> getNextFilePathsAndMaxModificationTime(Option lastCheckpointStr, - long sourceLimit) { + public Pair, String> getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext, Option lastCheckpointStr, + long sourceLimit) { + return getNextFilePathsAndMaxModificationTime(lastCheckpointStr, sourceLimit); + } + /** + * Get the list of files changed since last checkpoint. + * + * @param lastCheckpointStr the last checkpoint time string, empty if first run + * @param sourceLimit max bytes to read each time + * @return the list of files concatenated and their latest modified time + */ + @Deprecated + public Pair, String> getNextFilePathsAndMaxModificationTime(Option lastCheckpointStr, + long sourceLimit) { try { // obtain all eligible files under root folder. log.info("Root path => " + props.getString(Config.ROOT_INPUT_PATH_PROP) + " source limit => " + sourceLimit); @@ -136,7 +151,7 @@ public Pair, String> getNextFilePathsAndMaxModificationTime(Optio /** * List files recursively, filter out illegible files/directories while doing so. */ - private List listEligibleFiles(FileSystem fs, Path path, long lastCheckpointTime) throws IOException { + protected List listEligibleFiles(FileSystem fs, Path path, long lastCheckpointTime) throws IOException { // skip files/dirs whose names start with (_, ., etc) FileStatus[] statuses = fs.listStatus(path, file -> IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith(pfx))); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java new file mode 100644 index 0000000000000..2cedb6ceaf883 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP; +import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH; +import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH; +import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_LOOKBACK_DAYS; +import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM; +import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS; +import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM; + +/** + * Custom dfs path selector used to list just the last few days provided there is a date based + * partition. + * + *

This is useful for workloads where there are multiple partition fields and only recent + * partitions are affected by new writes. Especially if the data sits in S3, listing all historical + * data can be time expensive and unnecessary for the above type of workload. + * + *

The date based partition is expected to be of the format '=yyyy-mm-dd' or + * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the + * form `////` or + * `/</` + */ +public class DatePartitionPathSelector extends DFSPathSelector { + + private static volatile Logger LOG = LogManager.getLogger(DatePartitionPathSelector.class); + + private final int datePartitionDepth; + private final int numPrevDaysToList; + private final LocalDate fromDate; + private final LocalDate currentDate; + private final int partitionsListParallelism; + + /** Configs supported. */ + public static class Config { + public static final String DATE_PARTITION_DEPTH = + "hoodie.deltastreamer.source.dfs.datepartitioned.selector.depth"; + public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no (date) partition + + public static final String LOOKBACK_DAYS = + "hoodie.deltastreamer.source.dfs.datepartitioned.selector.lookback.days"; + public static final int DEFAULT_LOOKBACK_DAYS = 2; + + public static final String CURRENT_DATE = + "hoodie.deltastreamer.source.dfs.datepartitioned.selector.currentdate"; + + + public static final String PARTITIONS_LIST_PARALLELISM = + "hoodie.deltastreamer.source.dfs.datepartitioned.selector.parallelism"; + public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20; + } + + public DatePartitionPathSelector(TypedProperties props, Configuration hadoopConf) { + super(props, hadoopConf); + /* + * datePartitionDepth = 0 is same as basepath and there is no partition. In which case + * this path selector would be a no-op and lists all paths under the table basepath. + */ + datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH); + // If not specified the current date is assumed by default. + currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString())); + numPrevDaysToList = props.getInteger(LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS); + fromDate = currentDate.minusDays(numPrevDaysToList); + partitionsListParallelism = props.getInteger(PARTITIONS_LIST_PARALLELISM, DEFAULT_PARTITIONS_LIST_PARALLELISM); + } + + @Override + public Pair, String> getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext, + Option lastCheckpointStr, + long sourceLimit) { + // obtain all eligible files under root folder. + LOG.info( + "Root path => " + + props.getString(ROOT_INPUT_PATH_PROP) + + " source limit => " + + sourceLimit + + " depth of day partition => " + + datePartitionDepth + + " num prev days to list => " + + numPrevDaysToList + + " from current date => " + + currentDate); + long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE); + HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext); + SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf()); + List prunedParitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP)); + + List eligibleFiles = context.flatMap(prunedParitionPaths, + path -> { + FileSystem fs = new Path(path).getFileSystem(serializedConf.get()); + return listEligibleFiles(fs, new Path(path), lastCheckpointTime).stream(); + }, partitionsListParallelism); + // sort them by modification time. + eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime)); + + // Filter based on checkpoint & input size, if needed + long currentBytes = 0; + long maxModificationTime = Long.MIN_VALUE; + List filteredFiles = new ArrayList<>(); + for (FileStatus f : eligibleFiles) { + if (currentBytes + f.getLen() >= sourceLimit) { + // we have enough data, we are done + break; + } + + maxModificationTime = f.getModificationTime(); + currentBytes += f.getLen(); + filteredFiles.add(f); + } + + // no data to read + if (filteredFiles.isEmpty()) { + return new ImmutablePair<>( + Option.empty(), lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE))); + } + + // read the files out. + String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(",")); + + return new ImmutablePair<>(Option.ofNullable(pathStr), String.valueOf(maxModificationTime)); + } + + /** + * Prunes date level partitions to last few days configured by 'NUM_PREV_DAYS_TO_LIST' from + * 'CURRENT_DATE'. Parallelizes listing by leveraging HoodieSparkEngineContext's methods. + */ + public List pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath) { + List partitionPaths = new ArrayList<>(); + // get all partition paths before date partition level + partitionPaths.add(rootPath); + if (datePartitionDepth <= 0) { + return partitionPaths; + } + SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf()); + for (int i = 0; i < datePartitionDepth; i++) { + partitionPaths = context.flatMap(partitionPaths, path -> { + Path subDir = new Path(path); + FileSystem fileSystem = subDir.getFileSystem(serializedConf.get()); + // skip files/dirs whose names start with (_, ., etc) + FileStatus[] statuses = fileSystem.listStatus(subDir, + file -> IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith(pfx))); + List res = new ArrayList<>(); + for (FileStatus status : statuses) { + res.add(status.getPath().toString()); + } + return res.stream(); + }, partitionsListParallelism); + } + + // Prune date partitions to last few days + return context.getJavaSparkContext().parallelize(partitionPaths, partitionsListParallelism) + .filter(s -> { + String[] splits = s.split("/"); + String datePartition = splits[splits.length - 1]; + LocalDate partitionDate; + if (datePartition.contains("=")) { + String[] moreSplit = datePartition.split("="); + ValidationUtils.checkArgument( + moreSplit.length == 2, + "Partition Field (" + datePartition + ") not in expected format"); + partitionDate = LocalDate.parse(moreSplit[1]); + } else { + partitionDate = LocalDate.parse(datePartition); + } + return (partitionDate.isEqual(fromDate) || partitionDate.isAfter(fromDate)) + && (partitionDate.isEqual(currentDate) || partitionDate.isBefore(currentDate)); + }).collect(); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java new file mode 100644 index 0000000000000..b7e127924f998 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; + +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP; +import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.CURRENT_DATE; +import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH; +import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestDatePartitionPathSelector extends HoodieClientTestHarness { + + private transient HoodieSparkEngineContext context = null; + static List totalDates; + + @BeforeAll + public static void initClass() { + String s = "2020-07-21"; + String e = "2020-07-25"; + LocalDate start = LocalDate.parse(s); + LocalDate end = LocalDate.parse(e); + totalDates = new ArrayList<>(); + while (!start.isAfter(end)) { + totalDates.add(start); + start = start.plusDays(1); + } + } + + @BeforeEach + public void setup() { + initSparkContexts(); + initPath(); + initFileSystem(); + context = new HoodieSparkEngineContext(jsc); + } + + @AfterEach + public void teardown() throws Exception { + cleanupResources(); + } + + /* + * Create Date partitions with some files under each of the leaf Dirs. + */ + public List createDatePartitionsWithFiles(List leafDirs, boolean hiveStyle) + throws IOException { + List allFiles = new ArrayList<>(); + for (Path path : leafDirs) { + List datePartitions = generateDatePartitionsUnder(path, hiveStyle); + for (Path datePartition : datePartitions) { + allFiles.addAll(createRandomFilesUnder(datePartition)); + } + } + return allFiles; + } + + /** + * Create all parent level dirs before the date partitions. + * + * @param root Current parent dir. Initially this points to table basepath. + * @param dirs List o sub dirs to be created under root. + * @param depth Depth of partitions before date partitions. + * @param leafDirs Collect list of leaf dirs. These will be the immediate parents of date based partitions. + * @throws IOException + */ + public void createParentDirsBeforeDatePartitions(Path root, List dirs, int depth, List leafDirs) + throws IOException { + if (depth <= 0) { + leafDirs.add(root); + return; + } + for (String s : dirs) { + Path subdir = new Path(root, s); + fs.mkdirs(subdir); + createParentDirsBeforeDatePartitions(subdir, generateRandomStrings(), depth - 1, leafDirs); + } + } + + /* + * Random string generation util used for generating file names or file contents. + */ + private List generateRandomStrings() { + List subDirs = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + subDirs.add(UUID.randomUUID().toString()); + } + return subDirs; + } + + /* + * Generate date based partitions under a parent dir with or without hivestyle formatting. + */ + private List generateDatePartitionsUnder(Path parent, boolean hiveStyle) throws IOException { + List datePartitions = new ArrayList<>(); + String prefix = (hiveStyle ? "dt=" : ""); + for (int i = 0; i < 5; i++) { + Path child = new Path(parent, prefix + totalDates.get(i).toString()); + fs.mkdirs(child); + datePartitions.add(child); + } + return datePartitions; + } + + /* + * Creates random files under the given directory. + */ + private List createRandomFilesUnder(Path path) throws IOException { + List resultFiles = new ArrayList<>(); + List fileNames = generateRandomStrings(); + for (String fileName : fileNames) { + List fileContent = generateRandomStrings(); + String[] lines = new String[fileContent.size()]; + lines = fileContent.toArray(lines); + Path file = new Path(path, fileName); + UtilitiesTestBase.Helpers.saveStringsToDFS(lines, fs, file.toString()); + resultFiles.add(file); + } + return resultFiles; + } + + private static TypedProperties getProps( + String basePath, int datePartitionDepth, int numDaysToList, String currentDate) { + TypedProperties properties = new TypedProperties(); + properties.put(ROOT_INPUT_PATH_PROP, basePath); + properties.put(DATE_PARTITION_DEPTH, "" + datePartitionDepth); + properties.put(LOOKBACK_DAYS, "" + numDaysToList); + properties.put(CURRENT_DATE, currentDate); + return properties; + } + + /* + * Return test params => (table basepath, date partition's depth, + * num of prev days to list, current date, is date partition formatted in hive style?, + * expected number of paths after pruning) + */ + private static Stream configParams() { + Object[][] data = + new Object[][] { + {"table1", 0, 2, "2020-07-25", true, 1}, + {"table2", 0, 2, "2020-07-25", false, 1}, + {"table3", 1, 3, "2020-07-25", true, 4}, + {"table4", 1, 3, "2020-07-25", false, 4}, + {"table5", 2, 1, "2020-07-25", true, 10}, + {"table6", 2, 1, "2020-07-25", false, 10}, + {"table7", 3, 2, "2020-07-25", true, 75}, + {"table8", 3, 2, "2020-07-25", false, 75} + }; + return Stream.of(data).map(Arguments::of); + } + + @ParameterizedTest(name = "[{index}] {0}") + @MethodSource("configParams") + public void testPruneDatePartitionPaths( + String tableName, + int datePartitionDepth, + int numPrevDaysToList, + String currentDate, + boolean isHiveStylePartition, + int expectedNumFiles) + throws IOException { + TypedProperties props = getProps(basePath + "/" + tableName, datePartitionDepth, numPrevDaysToList, currentDate); + DatePartitionPathSelector pathSelector = new DatePartitionPathSelector(props, jsc.hadoopConfiguration()); + + Path root = new Path(props.getString(ROOT_INPUT_PATH_PROP)); + int totalDepthBeforeDatePartitions = props.getInteger(DATE_PARTITION_DEPTH) - 1; + + // Create parent dir + List leafDirs = new ArrayList<>(); + createParentDirsBeforeDatePartitions(root, generateRandomStrings(), totalDepthBeforeDatePartitions, leafDirs); + createDatePartitionsWithFiles(leafDirs, isHiveStylePartition); + + List paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString()); + + assertEquals(expectedNumFiles, pathSelector.pruneDatePartitionPaths(context, fs, root.toString()).size()); + } +}