Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,9 @@ class DefaultSource extends RelationProvider
override def createRelation(sqlContext: SQLContext,
optParams: Map[String, String],
schema: StructType): BaseRelation = {
// Add default options for unspecified read options keys.
val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
val path = optParams.get("path")
val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)

val path = parameters.get("path")
val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS.key)
if (path.isEmpty && readPathsStr.isEmpty) {
throw new HoodieException(s"'path' or '$READ_PATHS' or both must be specified.")
}
Expand All @@ -87,6 +85,16 @@ class DefaultSource extends RelationProvider
} else {
Seq.empty
}

// Add default options for unspecified read options keys.
val parameters = (if (globPaths.nonEmpty) {
Map(
"glob.paths" -> globPaths.mkString(",")
)
} else {
Map()
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams)

// Get the table base path
val tablePath = if (globPaths.nonEmpty) {
DataSourceUtils.getTablePath(fs, globPaths.toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
Expand All @@ -44,7 +45,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionDirectory, PartitionedFile, PartitioningUtils}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
Expand Down Expand Up @@ -222,7 +223,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
toScalaOption(timeline.lastInstant())

protected def queryTimestamp: Option[String] =
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
specifiedQueryTimestamp.orElse(latestInstant.map(_.getTimestamp))

/**
* Returns true in case table supports Schema on Read (Schema Evolution)
Expand Down Expand Up @@ -340,20 +341,49 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*/
protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit]

protected def listLatestBaseFiles(globbedPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
val partitionDirs = if (globbedPaths.isEmpty) {
/**
* Get all PartitionDirectories based on globPaths if specified, otherwise use the table path.
* Will perform pruning if necessary
*/
private def listPartitionDirectories(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
if (globPaths.isEmpty) {
fileIndex.listFiles(partitionFilters, dataFilters)
} else {
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globbedPaths)
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths)
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
}
}

/**
* Get all latest base files with partition paths, if globPaths is empty, will listing files
* under the table path.
*/
protected def listLatestBaseFiles(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
val partitionDirs = listPartitionDirectories(globPaths, partitionFilters, dataFilters)
val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)

val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus)

latestBaseFiles.groupBy(getPartitionPath)
}

/**
* Get all fileSlices(contains base files and log files if exist) from globPaths if not empty,
* otherwise will use the table path to do the listing.
*/
protected def listLatestFileSlices(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
latestInstant.map { _ =>
val partitionDirs = listPartitionDirectories(globPaths, partitionFilters, dataFilters)
val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)

val queryTimestamp = this.queryTimestamp.get
fsView.getPartitionPaths.asScala.flatMap { partitionPath =>
val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath)
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq
}
}.getOrElse(Seq())
}

protected def convertToExpressions(filters: Array[Filter]): Array[Expression] = {
val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ case class HoodieFileIndex(spark: SparkSession,
metaClient = metaClient,
schemaSpec = schemaSpec,
configProperties = getConfigProperties(spark, options),
queryPaths = Seq(HoodieFileIndex.getQueryPath(options)),
queryPaths = HoodieFileIndex.getQueryPaths(options),
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
)
Expand Down Expand Up @@ -341,10 +341,15 @@ object HoodieFileIndex extends Logging {
}
}

private def getQueryPath(options: Map[String, String]) = {
new Path(options.get("path") match {
case Some(p) => p
case None => throw new IllegalArgumentException("'path' option required")
})
private def getQueryPaths(options: Map[String, String]): Seq[Path] = {
options.get("path") match {
case Some(p) => Seq(new Path(p))
case None =>
options.getOrElse("glob.paths",
throw new IllegalArgumentException("'path' or 'glob paths' option required"))
.split(",")
.map(new Path(_))
.toSeq
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,8 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
val fileSlices = fileIndex.listFileSlices(convertedPartitionFilters)
buildSplits(fileSlices.values.flatten.toSeq)
} else {
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths)
val partitionDirs = inMemoryFileIndex.listFiles(partitionFilters, dataFilters)

val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
val partitionPaths = fsView.getPartitionPaths.asScala

if (partitionPaths.isEmpty || latestInstant.isEmpty) {
// If this an empty table OR it has no completed commits yet, return
List.empty[HoodieMergeOnReadFileSplit]
} else {
val queryTimestamp = this.queryTimestamp.get

val fileSlices = partitionPaths.flatMap { partitionPath =>
val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath)
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq
}
buildSplits(fileSlices)
}
val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alexeykudinkin move this implementation to the HoodieBaseRelation, could you plz review this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG, will do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alexeykudinkin Could you plz review it these days, as it blocks the pr: #6046, thanks!

buildSplits(fileSlices)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.hudi.functional

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieRecord
Expand Down Expand Up @@ -296,6 +296,39 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals("replacecommit", commits(1))
}

@Test
def testReadPathsOnCopyOnWriteTable(): Unit = {
val records1 = dataGen.generateInsertsContainsAllPartitions("001", 20)
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)

val record1FilePaths = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head))
.filter(!_.getPath.getName.contains("hoodie_partition_metadata"))
.filter(_.getPath.getName.endsWith("parquet"))
.map(_.getPath.toString)
.mkString(",")

val records2 = dataGen.generateInsertsContainsAllPartitions("002", 20)
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
// Use bulk insert here to make sure the files have different file groups.
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)

val hudiReadPathDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.READ_PATHS.key, record1FilePaths)
.load()

val expectedCount = records1.asScala.count(record => record.getPartitionPath == dataGen.getPartitionPaths.head)
assertEquals(expectedCount, hudiReadPathDF.count())
}

@Test def testOverWriteTableModeUseReplaceAction(): Unit = {
val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,96 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
assertEquals(partitionCounts("2021/03/03"), count7)
}

@Test
def testReadPathsForMergeOnReadTable(): Unit = {
// Paths only baseFiles
val records1 = dataGen.generateInserts("001", 100)
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
val baseFilePath = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head))
.filter(_.getPath.getName.endsWith("parquet"))
.map(_.getPath.toString)
.mkString(",")
val records2 = dataGen.generateUniqueDeleteRecords("002", 100)
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Append)
.save(basePath)
val hudiReadPathDF1 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.option(DataSourceReadOptions.READ_PATHS.key, baseFilePath)
.load()

val expectedCount1 = records1.asScala.count(record => record.getPartitionPath == dataGen.getPartitionPaths.head)
assertEquals(expectedCount1, hudiReadPathDF1.count())

// Paths Contains both baseFile and log files
val logFilePath = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head))
.filter(_.getPath.getName.contains("log"))
.map(_.getPath.toString)
.mkString(",")

val readPaths = baseFilePath + "," + logFilePath
val hudiReadPathDF2 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.option(DataSourceReadOptions.READ_PATHS.key, readPaths)
.load()

assertEquals(0, hudiReadPathDF2.count())
}

@Test
def testReadPathsForOnlyLogFiles(): Unit = {
initMetaClient(HoodieTableType.MERGE_ON_READ)
val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20)
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
inputDF1.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
// Use InMemoryIndex to generate log only mor table.
.option(HoodieIndexConfig.INDEX_TYPE.key, IndexType.INMEMORY.toString)
.mode(SaveMode.Overwrite)
.save(basePath)
// There should no base file in the file list.
assertTrue(DataSourceTestUtils.isLogFileOnly(basePath))

val logFilePath = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head))
.filter(_.getPath.getName.contains("log"))
.map(_.getPath.toString)
.mkString(",")

val records2 = dataGen.generateInsertsContainsAllPartitions("000", 20)
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2))
inputDF2.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
// Use InMemoryIndex to generate log only mor table.
.option(HoodieIndexConfig.INDEX_TYPE.key, IndexType.INMEMORY.toString)
.mode(SaveMode.Append)
.save(basePath)
// There should no base file in the file list.
assertTrue(DataSourceTestUtils.isLogFileOnly(basePath))

val expectedCount1 = records1.asScala.count(record => record.getPartitionPath == dataGen.getPartitionPaths.head)

val hudiReadPathDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.option(DataSourceReadOptions.READ_PATHS.key, logFilePath)
.load()

assertEquals(expectedCount1, hudiReadPathDF.count())
}

@Test
def testReadLogOnlyMergeOnReadTable(): Unit = {
initMetaClient(HoodieTableType.MERGE_ON_READ)
Expand Down