diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 302ca755a39d3..10978e407f7f5 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -102,7 +102,14 @@ object DataSourceReadOptions { * This option allows setting filters directly on Hoodie Source */ val PUSH_DOWN_INCR_FILTERS_OPT_KEY = "hoodie.datasource.read.incr.filters" - val DEFAULTPUSH_DOWN_FILTERS_OPT_VAL = "" + val DEFAULT_PUSH_DOWN_FILTERS_OPT_VAL = "" + + /** + * For the use-cases like users only want to incremental pull from certain partitions instead of the full table. + * This option allows using glob pattern to directly filter on path. + */ + val INCR_PATH_GLOB_OPT_KEY = "hoodie.datasource.read.incr.path.glob" + val DEFAULT_INCR_PATH_GLOB_OPT_VAL = "" } /** diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala index f70655b4f44ff..a9e7389d6c942 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -17,6 +17,7 @@ package org.apache.hudi +import org.apache.hadoop.fs.GlobPattern import org.apache.hadoop.fs.Path import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType} import org.apache.hudi.common.table.HoodieTableMetaClient @@ -84,7 +85,9 @@ class IncrementalRelation(val sqlContext: SQLContext, val filters = { if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) { - val filterStr = optParams.get(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY).getOrElse("") + val filterStr = optParams.getOrElse( + DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY, + DataSourceReadOptions.DEFAULT_PUSH_DOWN_FILTERS_OPT_VAL) filterStr.split(",").filter(!_.isEmpty) } else { Array[String]() @@ -100,17 +103,26 @@ class IncrementalRelation(val sqlContext: SQLContext, .get, classOf[HoodieCommitMetadata]) fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap } + val pathGlobPattern = optParams.getOrElse( + DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, + DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL) + val filteredFullPath = if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) { + val globMatcher = new GlobPattern("*" + pathGlobPattern) + fileIdToFullPath.filter(p => globMatcher.matches(p._2)) + } else { + fileIdToFullPath + } // unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view // will filter out all the files incorrectly. sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class") val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path")) - if (fileIdToFullPath.isEmpty) { + if (filteredFullPath.isEmpty) { sqlContext.sparkContext.emptyRDD[Row] } else { log.info("Additional Filters to be applied to incremental source are :" + filters) filters.foldLeft(sqlContext.read.options(sOpts) .schema(latestSchema) - .parquet(fileIdToFullPath.values.toList: _*) + .parquet(filteredFullPath.values.toList: _*) .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp)) .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)))((e, f) => e.filter(f)) diff --git a/hudi-spark/src/test/scala/TestDataSource.scala b/hudi-spark/src/test/scala/TestDataSource.scala index e9675ab29b452..6bd32b475d1c6 100644 --- a/hudi-spark/src/test/scala/TestDataSource.scala +++ b/hudi-spark/src/test/scala/TestDataSource.scala @@ -22,6 +22,7 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime} +import org.apache.spark.sql.functions.col import org.junit.Assert._ import org.junit.rules.TemporaryFolder import org.junit.{Before, Test} @@ -135,6 +136,14 @@ class TestDataSource extends AssertionsForJUnit { countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); assertEquals(1, countsPerCommit.length) assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) + + // pull the latest commit within certain partitions + val hoodieIncViewDF3 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2016/*/*/*") + .load(basePath); + assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count()) } @Test def testMergeOnReadStorage() {