diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 71c38f765509d..484debbb81be0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -68,11 +68,9 @@ class DefaultSource extends RelationProvider override def createRelation(sqlContext: SQLContext, optParams: Map[String, String], schema: StructType): BaseRelation = { - // Add default options for unspecified read options keys. - val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(optParams) + val path = optParams.get("path") + val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key) - val path = parameters.get("path") - val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS.key) if (path.isEmpty && readPathsStr.isEmpty) { throw new HoodieException(s"'path' or '$READ_PATHS' or both must be specified.") } @@ -87,6 +85,16 @@ class DefaultSource extends RelationProvider } else { Seq.empty } + + // Add default options for unspecified read options keys. + val parameters = (if (globPaths.nonEmpty) { + Map( + "glob.paths" -> globPaths.mkString(",") + ) + } else { + Map() + }) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams) + // Get the table base path val tablePath = if (globPaths.nonEmpty) { DataSourceUtils.getTablePath(fs, globPaths.toArray) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 43a2d72733eb1..70ef2184d7172 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -26,9 +26,10 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath} import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.HoodieAvroUtils -import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration} +import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} +import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath +import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} @@ -44,7 +45,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils} +import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionDirectory, PartitionedFile, PartitioningUtils} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -222,7 +223,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, toScalaOption(timeline.lastInstant()) protected def queryTimestamp: Option[String] = - specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp)) + specifiedQueryTimestamp.orElse(latestInstant.map(_.getTimestamp)) /** * Returns true in case table supports Schema on Read (Schema Evolution) @@ -340,20 +341,49 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit] - protected def listLatestBaseFiles(globbedPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = { - val partitionDirs = if (globbedPaths.isEmpty) { + /** + * Get all PartitionDirectories based on globPaths if specified, otherwise use the table path. + * Will perform pruning if necessary + */ + private def listPartitionDirectories(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + if (globPaths.isEmpty) { fileIndex.listFiles(partitionFilters, dataFilters) } else { - val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globbedPaths) + val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths) inMemoryFileIndex.listFiles(partitionFilters, dataFilters) } + } + /** + * Get all latest base files with partition paths, if globPaths is empty, will listing files + * under the table path. + */ + protected def listLatestBaseFiles(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = { + val partitionDirs = listPartitionDirectories(globPaths, partitionFilters, dataFilters) val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) + val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus) latestBaseFiles.groupBy(getPartitionPath) } + /** + * Get all fileSlices(contains base files and log files if exist) from globPaths if not empty, + * otherwise will use the table path to do the listing. + */ + protected def listLatestFileSlices(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = { + latestInstant.map { _ => + val partitionDirs = listPartitionDirectories(globPaths, partitionFilters, dataFilters) + val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) + + val queryTimestamp = this.queryTimestamp.get + fsView.getPartitionPaths.asScala.flatMap { partitionPath => + val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath) + fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq + } + }.getOrElse(Seq()) + } + protected def convertToExpressions(filters: Array[Filter]): Array[Expression] = { val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index d73e3a5d3b934..8748f1c15c81f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -76,7 +76,7 @@ case class HoodieFileIndex(spark: SparkSession, metaClient = metaClient, schemaSpec = schemaSpec, configProperties = getConfigProperties(spark, options), - queryPaths = Seq(HoodieFileIndex.getQueryPath(options)), + queryPaths = HoodieFileIndex.getQueryPaths(options), specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant), fileStatusCache = fileStatusCache ) @@ -341,10 +341,15 @@ object HoodieFileIndex extends Logging { } } - private def getQueryPath(options: Map[String, String]) = { - new Path(options.get("path") match { - case Some(p) => p - case None => throw new IllegalArgumentException("'path' option required") - }) + private def getQueryPaths(options: Map[String, String]): Seq[Path] = { + options.get("path") match { + case Some(p) => Seq(new Path(p)) + case None => + options.getOrElse("glob.paths", + throw new IllegalArgumentException("'path' or 'glob paths' option required")) + .split(",") + .map(new Path(_)) + .toSeq + } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index afa43cee209fe..84098827e74b6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -104,24 +104,8 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, val fileSlices = fileIndex.listFileSlices(convertedPartitionFilters) buildSplits(fileSlices.values.flatten.toSeq) } else { - val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths) - val partitionDirs = inMemoryFileIndex.listFiles(partitionFilters, dataFilters) - - val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) - val partitionPaths = fsView.getPartitionPaths.asScala - - if (partitionPaths.isEmpty || latestInstant.isEmpty) { - // If this an empty table OR it has no completed commits yet, return - List.empty[HoodieMergeOnReadFileSplit] - } else { - val queryTimestamp = this.queryTimestamp.get - - val fileSlices = partitionPaths.flatMap { partitionPath => - val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath) - fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq - } - buildSplits(fileSlices) - } + val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters) + buildSplits(fileSlices) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index fea1ec357145b..26c9aeba00b11 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -17,7 +17,7 @@ package org.apache.hudi.functional -import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.HoodieRecord @@ -296,6 +296,39 @@ class TestCOWDataSource extends HoodieClientTestBase { assertEquals("replacecommit", commits(1)) } + @Test + def testReadPathsOnCopyOnWriteTable(): Unit = { + val records1 = dataGen.generateInsertsContainsAllPartitions("001", 20) + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + + val record1FilePaths = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head)) + .filter(!_.getPath.getName.contains("hoodie_partition_metadata")) + .filter(_.getPath.getName.endsWith("parquet")) + .map(_.getPath.toString) + .mkString(",") + + val records2 = dataGen.generateInsertsContainsAllPartitions("002", 20) + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + // Use bulk insert here to make sure the files have different file groups. + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + + val hudiReadPathDF = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.READ_PATHS.key, record1FilePaths) + .load() + + val expectedCount = records1.asScala.count(record => record.getPartitionPath == dataGen.getPartitionPaths.head) + assertEquals(expectedCount, hudiReadPathDF.count()) + } + @Test def testOverWriteTableModeUseReplaceAction(): Unit = { val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index ccb98deca884d..b634d0baa54d7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -704,6 +704,96 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { assertEquals(partitionCounts("2021/03/03"), count7) } + @Test + def testReadPathsForMergeOnReadTable(): Unit = { + // Paths only baseFiles + val records1 = dataGen.generateInserts("001", 100) + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + val baseFilePath = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head)) + .filter(_.getPath.getName.endsWith("parquet")) + .map(_.getPath.toString) + .mkString(",") + val records2 = dataGen.generateUniqueDeleteRecords("002", 100) + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val hudiReadPathDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .option(DataSourceReadOptions.READ_PATHS.key, baseFilePath) + .load() + + val expectedCount1 = records1.asScala.count(record => record.getPartitionPath == dataGen.getPartitionPaths.head) + assertEquals(expectedCount1, hudiReadPathDF1.count()) + + // Paths Contains both baseFile and log files + val logFilePath = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head)) + .filter(_.getPath.getName.contains("log")) + .map(_.getPath.toString) + .mkString(",") + + val readPaths = baseFilePath + "," + logFilePath + val hudiReadPathDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .option(DataSourceReadOptions.READ_PATHS.key, readPaths) + .load() + + assertEquals(0, hudiReadPathDF2.count()) + } + + @Test + def testReadPathsForOnlyLogFiles(): Unit = { + initMetaClient(HoodieTableType.MERGE_ON_READ) + val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20) + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + inputDF1.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + // Use InMemoryIndex to generate log only mor table. + .option(HoodieIndexConfig.INDEX_TYPE.key, IndexType.INMEMORY.toString) + .mode(SaveMode.Overwrite) + .save(basePath) + // There should no base file in the file list. + assertTrue(DataSourceTestUtils.isLogFileOnly(basePath)) + + val logFilePath = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head)) + .filter(_.getPath.getName.contains("log")) + .map(_.getPath.toString) + .mkString(",") + + val records2 = dataGen.generateInsertsContainsAllPartitions("000", 20) + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2)) + inputDF2.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + // Use InMemoryIndex to generate log only mor table. + .option(HoodieIndexConfig.INDEX_TYPE.key, IndexType.INMEMORY.toString) + .mode(SaveMode.Append) + .save(basePath) + // There should no base file in the file list. + assertTrue(DataSourceTestUtils.isLogFileOnly(basePath)) + + val expectedCount1 = records1.asScala.count(record => record.getPartitionPath == dataGen.getPartitionPaths.head) + + val hudiReadPathDF = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .option(DataSourceReadOptions.READ_PATHS.key, logFilePath) + .load() + + assertEquals(expectedCount1, hudiReadPathDF.count()) + } + @Test def testReadLogOnlyMergeOnReadTable(): Unit = { initMetaClient(HoodieTableType.MERGE_ON_READ)