Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConver
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
Expand Down Expand Up @@ -180,4 +180,9 @@ trait SparkAdapter extends Serializable {
* Create instance of [[ParquetFileFormat]]
*/
def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat]

/**
* Create instance of [[InterpretedPredicate]]
*/
def createInterpretedPredicate(e: Expression): InterpretedPredicate
}
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,20 @@ public final Stream<Pair<String, CompactionOperation>> getPendingCompactionOpera
}
}

public final List<Path> getPartitionPaths() {
try {
readLock.lock();
return fetchAllStoredFileGroups()
.filter(fg -> !isFileGroupReplaced(fg))
.map(HoodieFileGroup::getPartitionPath)
.distinct()
.map(name -> name.isEmpty() ? metaClient.getBasePathV2() : new Path(metaClient.getBasePathV2(), name))
.collect(Collectors.toList());
} finally {
readLock.unlock();
}
}

@Override
public final Stream<HoodieBaseFile> getLatestBaseFiles(String partitionStr) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,22 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
val fileSlices = fileIndex.listFileSlices(convertedPartitionFilters)
buildSplits(fileSlices.values.flatten.toSeq)
} else {
// TODO refactor to avoid iterating over listed files multiple times
val partitions = listLatestBaseFiles(globPaths, convertedPartitionFilters, dataFilters)
val partitionPaths = partitions.keys.toSeq
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths)
Copy link
Contributor

@alexeykudinkin alexeykudinkin Jun 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@boneanxs appreciate your effort of addressing this TODO.

Although, the better way to refactor this would be to instead change listLatestBaseFiles to be listLatestFileSlices and return file-slices instead of just the base-files. Right now we've essentially duplicated this method implementation in here. I'd suggest we avoid the duplication by changing the method as noted above and keeping this code mostly intact.

Keep in mind: one of the goals of the refactoring of COW/MOR relations was to bring implementations closer together making them bifurcate only at a points where it's necessary (for ex, in a way they actually read file-slices) and everywhere else keep them mostly identical.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @codope

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address this in #5722

val partitionDirs = inMemoryFileIndex.listFiles(partitionFilters, dataFilters)

val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
val partitionPaths = fsView.getPartitionPaths.asScala

if (partitionPaths.isEmpty || latestInstant.isEmpty) {
// If this an empty table OR it has no completed commits yet, return
List.empty[HoodieMergeOnReadFileSplit]
} else {
val fileSlices = listFileSlices(partitionPaths)
val queryTimestamp = this.queryTimestamp.get

val fileSlices = partitionPaths.flatMap { partitionPath =>
val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath)
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq
}
buildSplits(fileSlices)
}
}
Expand All @@ -130,20 +138,6 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles)
}.toList
}

private def listFileSlices(partitionPaths: Seq[Path]): Seq[FileSlice] = {
// NOTE: It's critical for us to re-use [[InMemoryFileIndex]] to make sure we're leveraging
// [[FileStatusCache]] and avoid listing the whole table again
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, partitionPaths)
val fsView = new HoodieTableFileSystemView(metaClient, timeline, inMemoryFileIndex.allFiles.toArray)

val queryTimestamp = this.queryTimestamp.get

partitionPaths.flatMap { partitionPath =>
val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath)
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq
}
}
}

object MergeOnReadSnapshotRelation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ package org.apache.spark.execution.datasources
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.HoodieHadoopFSUtils
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.StructType

Expand All @@ -34,7 +37,77 @@ class HoodieInMemoryFileIndex(sparkSession: SparkSession,
parameters: Map[String, String],
userSpecifiedSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache)
extends InMemoryFileIndex(sparkSession, rootPathsSpecified, parameters, userSpecifiedSchema, fileStatusCache) {
extends InMemoryFileIndex(sparkSession, rootPathsSpecified, parameters, userSpecifiedSchema, fileStatusCache)
with SparkAdapterSupport {

/**
* Returns all valid files grouped into partitions when the data is partitioned. If the data is unpartitioned,
* this will return a single partition with no partition values
*
* NOTE: This method replicates the one it overrides, however it uses custom method
* that accepts files starting with "."
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Override this to accept files starting with "." to allow log files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a need for that? That's exactly what listLeafFiles is overridden for

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed, as we'll get partitions in listLatestBaseFile before,

protected def listLatestBaseFiles(globbedPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
    val partitionDirs = if (globbedPaths.isEmpty) {
      fileIndex.listFiles(partitionFilters, dataFilters)
    } else {
      val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globbedPaths)
      inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
    }

which will call inMemoryFileIndex.listFiles to get all partitionDirs, if we don't overwrite this method, log paths will be filtered

val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
      // Method isDataPath will filter path if it is a log file
      PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
    }

As isDataPath is a private method in PartitioningAwareFileIndex, we can't overwrite it directly, so we need to overwrite listFiles.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense now. Thanks for clarifying!

val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
} else {
prunePartitions(partitionFilters, partitionSpec()).map {
case PartitionPath(values, path) =>
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
existingDir.filter(f => isDataPath(f.getPath))

case None =>
// Directory does not exist, or has no children files
Nil
}
PartitionDirectory(values, files)
}
}
logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
selectedPartitions
}

private def isDataPath(path: Path): Boolean = {
val name = path.getName
!(name.startsWith("_") && !name.contains("="))
}

private def prunePartitions(
predicates: Seq[Expression],
partitionSpec: PartitionSpec): Seq[PartitionPath] = {
val PartitionSpec(partitionColumns, partitions) = partitionSpec
val partitionColumnNames = partitionColumns.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}

if (partitionPruningPredicates.nonEmpty) {
val predicate = partitionPruningPredicates.reduce(expressions.And)

val boundPredicate = sparkAdapter.createInterpretedPredicate(predicate.transform {
case a: AttributeReference =>
val index = partitionColumns.indexWhere(a.name == _.name)
BoundReference(index, partitionColumns(index).dataType, nullable = true)
})

val selected = partitions.filter {
case PartitionPath(values, _) => boundPredicate.eval(values)
}
logInfo {
val total = partitions.length
val selectedSize = selected.length
val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
s"Selected $selectedSize partitions out of $total, " +
s"pruned ${if (total == 0) "0" else s"$percentPruned%"} partitions."
}

selected
} else {
partitions
}
}

/**
* List leaf files of given paths. This method will submit a Spark job to do parallel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,8 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
.save(basePath)
// There should no base file in the file list.
assertTrue(DataSourceTestUtils.isLogFileOnly(basePath))
// Test read logs only mor table with glob paths.
assertEquals(20, spark.read.format("hudi").load(basePath + "/*/*/*/*").count())
// Test read log only mor table.
assertEquals(20, spark.read.format("hudi").load(basePath).count())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer, HoodieSpark2_4AvroDeserializer, HoodieSpark2_4AvroSerializer, HoodieSparkAvroSchemaConverters}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan}
Expand Down Expand Up @@ -168,4 +168,8 @@ class Spark2Adapter extends SparkAdapter {
override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
}

override def createInterpretedPredicate(e: Expression): InterpretedPredicate = {
InterpretedPredicate.create(e)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hudi.spark3.internal.ReflectUtil
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like, Predicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan}
Expand Down Expand Up @@ -149,4 +149,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter {
None
}
}

override def createInterpretedPredicate(e: Expression): InterpretedPredicate = {
Predicate.createInterpreted(e)
}
}