diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java index 688e72bd786a2..0fc580db0b657 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java @@ -34,12 +34,12 @@ public class FileSlice implements Serializable { /** * File Group Id of the Slice. */ - private HoodieFileGroupId fileGroupId; + private final HoodieFileGroupId fileGroupId; /** * Point in the timeline, at which the slice was created. */ - private String baseInstantTime; + private final String baseInstantTime; /** * data file, with the compacted data, for this slice. diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index 95752bac316c8..a5a3f7e215073 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -421,8 +421,8 @@ public static Map> groupFileStatusForSna } public static Map> groupSnapshotPathsByMetaClient( - Collection metaClientList, - List snapshotPaths + Collection metaClientList, + List snapshotPaths ) { Map> grouped = new HashMap<>(); metaClientList.forEach(metaClient -> grouped.put(metaClient, new ArrayList<>())); @@ -445,9 +445,11 @@ public static List filterFileStatusForSnapshotMode(JobConf job, Map< HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job); List returns = new ArrayList<>(); - Map> groupedPaths = HoodieInputFormatUtils - .groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths); + Map> groupedPaths = + HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths); + Map fsViewCache = new HashMap<>(); + LOG.info("Found a total of " + groupedPaths.size() + " groups"); try { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala new file mode 100644 index 0000000000000..462d88e0297d2 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.engine.HoodieEngineContext +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * Common (engine-agnostic) File Index implementation enabling individual query engines to + * list Hudi Table contents based on the + * + *
    + *
  • Table type (MOR, COW)
  • + *
  • Query type (snapshot, read_optimized, incremental)
  • + *
  • Query instant/range
  • + *
+ * + * @param engineContext Hudi engine-specific context + * @param metaClient Hudi table's meta-client + * @param configProperties unifying configuration (in the form of generic properties) + * @param queryType target query type + * @param queryPaths target DFS paths being queried + * @param specifiedQueryInstant instant as of which table is being queried + * @param shouldIncludePendingCommits flags whether file-index should exclude any pending operations + * @param fileStatusCache transient cache of fetched [[FileStatus]]es + */ +abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, + metaClient: HoodieTableMetaClient, + configProperties: TypedProperties, + specifiedQueryInstant: Option[String] = None, + @transient fileStatusCache: FileStatusCacheTrait) { + /** + * Get all completeCommits. + */ + lazy val completedCommits = metaClient.getCommitsTimeline + .filterCompletedInstants().getInstants.iterator().toList.map(_.getTimestamp) + + private lazy val _partitionColumns: Array[String] = + metaClient.getTableConfig.getPartitionFields.orElse(Array[String]()) + + private lazy val fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder() + .fromProperties(configProperties) + .build() + private lazy val metadataConfig = HoodieMetadataConfig.newBuilder + .fromProperties(configProperties) + .build() + protected val basePath: String = metaClient.getBasePath + + private val queryType = configProperties(QUERY_TYPE.key()) + private val tableType = metaClient.getTableType + + @transient private val queryPath = new Path(configProperties.getOrElse("path", "'path' option required")) + @transient + @volatile protected var cachedFileSize: Long = 0L + @transient + @volatile protected var cachedAllInputFileSlices: Map[PartitionPath, Seq[FileSlice]] = _ + @volatile protected var queryAsNonePartitionedTable: Boolean = _ + @transient + @volatile private var fileSystemView: HoodieTableFileSystemView = _ + + refresh0() + + /** + * Fetch list of latest base files and log files per partition. + * + * @return mapping from string partition paths to its base/log files + */ + def listFileSlices(): Map[String, Seq[FileSlice]] = { + if (queryAsNonePartitionedTable) { + // Read as Non-Partitioned table. + cachedAllInputFileSlices.map(entry => (entry._1.path, entry._2)) + } else { + cachedAllInputFileSlices.keys.toSeq.map(partition => { + (partition.path, cachedAllInputFileSlices(partition)) + }).toMap + } + } + + private def refresh0(): Unit = { + val startTime = System.currentTimeMillis() + val partitionFiles = loadPartitionPathFiles() + val allFiles = partitionFiles.values.reduceOption(_ ++ _) + .getOrElse(Array.empty[FileStatus]) + + metaClient.reloadActiveTimeline() + val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants + val latestInstant = activeInstants.lastInstant() + // TODO we can optimize the flow by: + // - First fetch list of files from instants of interest + // - Load FileStatus's + fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles) + val queryInstant = if (specifiedQueryInstant.isDefined) { + specifiedQueryInstant + } else if (latestInstant.isPresent) { + Some(latestInstant.get.getTimestamp) + } else { + None + } + + (tableType, queryType) match { + case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) => + // Fetch and store latest base and log files, and their sizes + cachedAllInputFileSlices = partitionFiles.map(p => { + val latestSlices = if (queryInstant.isDefined) { + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.path, queryInstant.get) + .iterator().asScala.toSeq + } else { + Seq() + } + (p._1, latestSlices) + }) + cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSlice => { + if (fileSlice.getBaseFile.isPresent) { + fileSlice.getBaseFile.get().getFileLen + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum + } else { + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum + } + }).sum + case (_, _) => + // Fetch and store latest base files and its sizes + cachedAllInputFileSlices = partitionFiles.map(p => { + val fileSlices = specifiedQueryInstant + .map(instant => + fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.path, instant, true)) + .getOrElse(fileSystemView.getLatestFileSlices(p._1.path)) + .iterator().asScala.toSeq + (p._1, fileSlices) + }) + cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSliceSize).sum + } + + // If the partition value contains InternalRow.empty, we query it as a non-partitioned table. + queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values.isEmpty) + val flushSpend = System.currentTimeMillis() - startTime + + logInfo(s"Refresh table ${metaClient.getTableConfig.getTableName}," + + s" spend: $flushSpend ms") + } + + protected def refresh(): Unit = { + fileStatusCache.invalidate() + refresh0() + } + + private def fileSliceSize(fileSlice: FileSlice): Long = { + val logFileSize = fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).filter(_ > 0).sum + if (fileSlice.getBaseFile.isPresent) { + fileSlice.getBaseFile.get().getFileLen + logFileSize + } else { + logFileSize + } + } + + /** + * Load all partition paths and it's files under the query table path. + */ + private def loadPartitionPathFiles(): Map[PartitionPath, Array[FileStatus]] = { + val partitionPaths = getAllQueryPartitionPaths + // List files in all of the partition path. + val pathToFetch = mutable.ArrayBuffer[PartitionPath]() + val cachePartitionToFiles = mutable.Map[PartitionPath, Array[FileStatus]]() + // Fetch from the FileStatusCache + partitionPaths.foreach { partitionPath => + fileStatusCache.get(partitionPath.fullPartitionPath(basePath)) match { + case Some(filesInPartition) => + cachePartitionToFiles.put(partitionPath, filesInPartition) + + case None => pathToFetch.append(partitionPath) + } + } + + val fetchedPartitionToFiles = + if (pathToFetch.nonEmpty) { + val fullPartitionPathsToFetch = pathToFetch.map(p => (p, p.fullPartitionPath(basePath).toString)).toMap + val partitionToFilesMap = FSUtils.getFilesInPartitions(engineContext, metadataConfig, basePath, + fullPartitionPathsToFetch.values.toArray, fileSystemStorageConfig.getSpillableDir) + fullPartitionPathsToFetch.map(p => { + (p._1, partitionToFilesMap.get(p._2)) + }) + } else { + Map.empty[PartitionPath, Array[FileStatus]] + } + + // Update the fileStatusCache + fetchedPartitionToFiles.foreach { + case (partitionRowPath, filesInPartition) => + fileStatusCache.put(partitionRowPath.fullPartitionPath(basePath), filesInPartition) + } + cachePartitionToFiles.toMap ++ fetchedPartitionToFiles + } + + def getAllQueryPartitionPaths: Seq[PartitionPath] = { + val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath) + // Load all the partition path from the basePath, and filter by the query partition path. + // TODO load files from the queryPartitionPath directly. + val partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath).asScala + .filter(_.startsWith(queryPartitionPath)) + + val partitionSchema = _partitionColumns + + // Convert partition's path into partition descriptor + partitionPaths.map { partitionPath => + val partitionColumnValues = parsePartitionColumnValues(partitionSchema, partitionPath) + PartitionPath(partitionPath, partitionColumnValues) + } + } + + /** + * Parses partition columns' values from the provided partition's path, returning list of + * values (that might have engine-specific representation) + * + * @param partitionColumns partitioning columns identifying the partition + * @param partitionPath partition's path to parse partitioning columns' values from + */ + protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Any] + + // TODO eval whether we should just use logger directly + protected def logWarning(str: => String): Unit + protected def logInfo(str: => String): Unit + + /** + * Represents a partition as a tuple of + *
    + *
  • Actual partition path (relative to the table's base path)
  • + *
  • Values of the corresponding columns table is being partitioned by (partitioning columns)
  • + *
+ * + * E.g. PartitionPath("2021/02/01", Array("2021","02","01")) + * + * NOTE: Partitioning column values might have engine specific representation (for ex, + * {@code UTF8String} for Spark, etc) and are solely used in partition pruning in an very + * engine-specific ways + * + * @param values values of the corresponding partitioning columns + * @param path partition's path + * + * TODO expose as a trait and make impls engine-specific (current impl is tailored for Spark) + */ + case class PartitionPath(path: String, values: Array[Any]) { + override def equals(other: Any): Boolean = other match { + case PartitionPath(otherPath, _) => path == otherPath + case _ => false + } + + override def hashCode(): Int = { + path.hashCode + } + + def fullPartitionPath(basePath: String): Path = { + if (path.isEmpty) { + new Path(basePath) // This is a non-partition path + } else { + new Path(basePath, path) + } + } + } +} + +trait FileStatusCacheTrait { + def get(path: Path): Option[Array[FileStatus]] + def put(path: Path, leafFiles: Array[FileStatus]): Unit + def invalidate(): Unit +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index a57691f9f5cf4..b15cde597a193 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -193,7 +193,6 @@ class DefaultSource extends RelationProvider } if (useHoodieFileIndex) { - val fileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient, if (schema == null) Option.empty[StructType] else Some(schema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index a6c63660e82cf..d7aa94f485adf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -18,34 +18,19 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} -import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.FileSlice -import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ -import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} - -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate} -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.hudi.HoodieFileIndex.getConfigProperties +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{And, Expression} import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory} import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, Column, SparkSession} -import org.apache.spark.unsafe.types.UTF8String -import java.util.Properties - -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.util.{Failure, Success, Try} /** @@ -69,122 +54,123 @@ import scala.util.{Failure, Success, Try} * , we read it as a Non-Partitioned table because we cannot know how to mapping the partition * path with the partition columns in this case. * + * TODO rename to HoodieSparkSqlFileIndex */ -case class HoodieFileIndex( - spark: SparkSession, - metaClient: HoodieTableMetaClient, - schemaSpec: Option[StructType], - options: Map[String, String], - @transient fileStatusCache: FileStatusCache = NoopCache) - extends FileIndex with Logging with SparkAdapterSupport { - - private val basePath = metaClient.getBasePath +case class HoodieFileIndex(spark: SparkSession, + metaClient: HoodieTableMetaClient, + schemaSpec: Option[StructType], + options: Map[String, String], + @transient fileStatusCache: FileStatusCache = NoopCache) + extends SparkHoodieTableFileIndex( + spark = spark, + metaClient = metaClient, + schemaSpec = schemaSpec, + configProperties = getConfigProperties(spark, options), + specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant), + fileStatusCache = fileStatusCache + ) + with FileIndex { @transient private val queryPath = new Path(options.getOrElse("path", "'path' option required")) - private val queryType = options(QUERY_TYPE.key()) - - private val tableType = metaClient.getTableType - - private val specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) - .map(HoodieSqlCommonUtils.formatQueryInstant) - - /** - * Get all completeCommits. - */ - lazy val completedCommits = metaClient.getCommitsTimeline - .filterCompletedInstants().getInstants.iterator().toList.map(_.getTimestamp) + override def rootPaths: Seq[Path] = queryPath :: Nil - /** - * Get the schema of the table. - */ - lazy val schema: StructType = schemaSpec.getOrElse({ - val schemaUtil = new TableSchemaResolver(metaClient) - AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) - }) + def enableDataSkipping(): Boolean = { + options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), + spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean + } /** - * Get the partition schema from the hoodie.properties. + * Returns the FileStatus for all the base files (excluding log files). This should be used only for + * cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic + * implemented internally within Hudi like HoodieBootstrapRelation. This helps avoid the use of path filter + * to filter out log files within Spark. + * + * @return List of FileStatus for base files */ - private lazy val _partitionSchemaFromProperties: StructType = { - val tableConfig = metaClient.getTableConfig - val partitionColumns = tableConfig.getPartitionFields - val nameFieldMap = generateNameFieldMap(Right(schema)) - - if (partitionColumns.isPresent) { - if (tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedKeyGenerator].getName) - || tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedAvroKeyGenerator].getName)) { - val partitionFields = partitionColumns.get().map(column => StructField(column, StringType)) - StructType(partitionFields) - } else { - val partitionFields = partitionColumns.get().map(column => - nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + - s"$column' in the schema[${schema.fields.mkString(",")}]"))) - StructType(partitionFields) - } - } else { // If the partition columns have not stored in hoodie.properties(the table that was - // created earlier), we trait it as a non-partitioned table. - logWarning("No partition columns available from hoodie.properties." + - " Partition pruning will not work") - new StructType() - } + def allFiles: Seq[FileStatus] = { + cachedAllInputFileSlices.values.flatten + .filter(_.getBaseFile.isPresent) + .map(_.getBaseFile.get().getFileStatus) + .toSeq } /** - * This method traverses StructType recursively to build map of columnName -> StructField - * Note : If there is nesting of columns like ["a.b.c.d", "a.b.c.e"] -> final map will have keys corresponding - * only to ["a.b.c.d", "a.b.c.e"] and not for subsets like ["a.b.c", "a.b"] - * @param structField - * @return map of ( columns names -> StructField ) + * Invoked by Spark to fetch list of latest base files per partition. + * + * @param partitionFilters partition column filters + * @param dataFilters data columns filters + * @return list of PartitionDirectory containing partition to base files mapping */ - private def generateNameFieldMap(structField: Either[StructField, StructType]) : Map[String, StructField] = { - structField match { - case Right(field) => field.fields.map(f => generateNameFieldMap(Left(f))).flatten.toMap - case Left(field) => field.dataType match { - case struct: StructType => generateNameFieldMap(Right(struct)).map { - case (key: String, sf: StructField) => (field.name + "." + key, sf) + override def listFiles(partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + // Look up candidate files names in the col-stats index, if all of the following conditions are true + // - Data-skipping is enabled + // - Col-Stats Index is present + // - List of predicates (filters) is present + val candidateFilesNamesOpt: Option[Set[String]] = + lookupCandidateFilesInColStatsIndex(dataFilters) match { + case Success(opt) => opt + case Failure(e) => + if (e.isInstanceOf[AnalysisException]) { + logDebug("Failed to relay provided data filters to Z-index lookup", e) + } else { + logError("Failed to lookup candidate files in Z-index", e) } - case _ => Map(field.name -> field) - } + Option.empty } - } - private lazy val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) + logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}") - private lazy val configProperties = { - val sqlConf: SQLConf = spark.sessionState.conf - val properties = new Properties() + if (queryAsNonePartitionedTable) { + // Read as Non-Partitioned table + // Filter in candidate files based on the col-stats index lookup + val candidateFiles = + allFiles.filter(fileStatus => + // NOTE: This predicate is true when {@code Option} is empty + candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName)) + ) - // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users - // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. - properties.setProperty(HoodieMetadataConfig.ENABLE.key(), - sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(), - HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString)) - properties.putAll(options.asJava) - properties - } + logInfo(s"Total files : ${allFiles.size}; " + + s"candidate files after data skipping: ${candidateFiles.size}; " + + s"skipping percent ${if (allFiles.nonEmpty) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}") - private lazy val fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder() - .fromProperties(configProperties) - .build() + Seq(PartitionDirectory(InternalRow.empty, candidateFiles)) + } else { + // Prune the partition path by the partition filters + val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters) + var totalFileSize = 0 + var candidateFileSize = 0 - private lazy val metadataConfig = HoodieMetadataConfig.newBuilder - .fromProperties(configProperties) - .build() + val result = prunedPartitions.map { partition => + val baseFileStatuses: Seq[FileStatus] = + cachedAllInputFileSlices(partition) + .map(fs => fs.getBaseFile.orElse(null)) + .filter(_ != null) + .map(_.getFileStatus) - @transient @volatile private var fileSystemView: HoodieTableFileSystemView = _ - @transient @volatile private var cachedAllInputFileSlices: Map[PartitionRowPath, Seq[FileSlice]] = _ - @transient @volatile private var cachedFileSize: Long = 0L + // Filter in candidate files based on the col-stats index lookup + val candidateFiles = + baseFileStatuses.filter(fs => + // NOTE: This predicate is true when {@code Option} is empty + candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName))) - @volatile private var queryAsNonePartitionedTable: Boolean = _ + totalFileSize += baseFileStatuses.size + candidateFileSize += candidateFiles.size + PartitionDirectory(InternalRow.fromSeq(partition.values), candidateFiles) + } - refresh0() + logInfo(s"Total base files: ${totalFileSize}; " + + s"candidate files after data skipping : ${candidateFileSize}; " + + s"skipping percent ${if (allFiles.nonEmpty) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}") - override def rootPaths: Seq[Path] = queryPath :: Nil + result + } + } - def enableDataSkipping(): Boolean = { - options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), - spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean + private def lookupFileNamesMissingFromIndex(allIndexedFileNames: Set[String]) = { + val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet + allBaseFileNames -- allIndexedFileNames } /** @@ -193,8 +179,8 @@ case class HoodieFileIndex( * "num_nulls" statistics for all clustered columns. * * NOTE: This method has to return complete set of candidate files, since only provided candidates will - * ultimately be scanned as part of query execution. Hence, this method has to maintain the - * invariant of conservatively including every base-file's name, that is NOT referenced in its index. + * ultimately be scanned as part of query execution. Hence, this method has to maintain the + * invariant of conservatively including every base-file's name, that is NOT referenced in its index. * * @param queryFilters list of original data filters passed down from querying engine * @return list of pruned (data-skipped) candidate base-files' names @@ -264,390 +250,36 @@ case class HoodieFileIndex( // files and all outstanding base-files, and make sure that all base files not // represented w/in the index are included in the output of this method val notIndexedFileNames = - lookupFileNamesMissingFromIndex(allIndexedFileNames) + lookupFileNamesMissingFromIndex(allIndexedFileNames) prunedCandidateFileNames ++ notIndexedFileNames }) } - private def lookupFileNamesMissingFromIndex(allIndexedFileNames: Set[String]) = { - val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet - allBaseFileNames -- allIndexedFileNames - } - - /** - * Invoked by Spark to fetch list of latest base files per partition. - * - * @param partitionFilters partition column filters - * @param dataFilters data columns filters - * @return list of PartitionDirectory containing partition to base files mapping - */ - override def listFiles(partitionFilters: Seq[Expression], - dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - // Look up candidate files names in the col-stats index, if all of the following conditions are true - // - Data-skipping is enabled - // - Col-Stats Index is present - // - List of predicates (filters) is present - val candidateFilesNamesOpt: Option[Set[String]] = - lookupCandidateFilesInColStatsIndex(dataFilters) match { - case Success(opt) => opt - case Failure(e) => - if (e.isInstanceOf[AnalysisException]) { - logDebug("Failed to relay provided data filters to Z-index lookup", e) - } else { - logError("Failed to lookup candidate files in Z-index", e) - } - Option.empty - } - - logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}") - - if (queryAsNonePartitionedTable) { - // Read as Non-Partitioned table - // Filter in candidate files based on the col-stats index lookup - val candidateFiles = - allFiles.filter(fileStatus => - // NOTE: This predicate is true when {@code Option} is empty - candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName)) - ) - - logInfo(s"Total files : ${allFiles.size}; " + - s"candidate files after data skipping: ${candidateFiles.size}; " + - s"skipping percent ${if (allFiles.nonEmpty) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}") - - Seq(PartitionDirectory(InternalRow.empty, candidateFiles)) - } else { - // Prune the partition path by the partition filters - val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters) - var totalFileSize = 0 - var candidateFileSize = 0 - - val result = prunedPartitions.map { partition => - val baseFileStatuses: Seq[FileStatus] = - cachedAllInputFileSlices(partition) - .map(fs => fs.getBaseFile.orElse(null)) - .filter(_ != null) - .map(_.getFileStatus) - - // Filter in candidate files based on the col-stats index lookup - val candidateFiles = - baseFileStatuses.filter(fs => - // NOTE: This predicate is true when {@code Option} is empty - candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName))) - - totalFileSize += baseFileStatuses.size - candidateFileSize += candidateFiles.size - PartitionDirectory(partition.values, candidateFiles) - } - - logInfo(s"Total base files: ${totalFileSize}; " + - s"candidate files after data skipping : ${candidateFileSize}; " + - s"skipping percent ${if (allFiles.nonEmpty) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}") - - result - } - } - - /** - * Fetch list of latest base files and log files per partition. - * - * @param partitionFilters partition column filters - * @param dataFilters data column filters - * @return mapping from string partition paths to its base/log files - */ - def listFileSlices(partitionFilters: Seq[Expression], - dataFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = { - if (queryAsNonePartitionedTable) { - // Read as Non-Partitioned table. - cachedAllInputFileSlices.map(entry => (entry._1.partitionPath, entry._2)) - } else { - // Prune the partition path by the partition filters - val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters) - prunedPartitions.map(partition => { - (partition.partitionPath, cachedAllInputFileSlices(partition)) - }).toMap - } - } + override def refresh(): Unit = super.refresh() override def inputFiles: Array[String] = { val fileStatusList = allFiles fileStatusList.map(_.getPath.toString).toArray } - override def refresh(): Unit = { - fileStatusCache.invalidateAll() - refresh0() - } - - private def refresh0(): Unit = { - val startTime = System.currentTimeMillis() - val partitionFiles = loadPartitionPathFiles() - val allFiles = partitionFiles.values.reduceOption(_ ++ _) - .getOrElse(Array.empty[FileStatus]) - - metaClient.reloadActiveTimeline() - val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants - val latestInstant = activeInstants.lastInstant() - fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles) - val queryInstant = if (specifiedQueryInstant.isDefined) { - specifiedQueryInstant - } else if (latestInstant.isPresent) { - Some(latestInstant.get.getTimestamp) - } else { - None - } - - (tableType, queryType) match { - case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) => - // Fetch and store latest base and log files, and their sizes - cachedAllInputFileSlices = partitionFiles.map(p => { - val latestSlices = if (latestInstant.isPresent) { - fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get) - .iterator().asScala.toSeq - } else { - Seq() - } - (p._1, latestSlices) - }) - cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSlice => { - if (fileSlice.getBaseFile.isPresent) { - fileSlice.getBaseFile.get().getFileLen + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum - } else { - fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum - } - }).sum - case (_, _) => - // Fetch and store latest base files and its sizes - cachedAllInputFileSlices = partitionFiles.map(p => { - val fileSlices = specifiedQueryInstant - .map(instant => - fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.partitionPath, instant, true)) - .getOrElse(fileSystemView.getLatestFileSlices(p._1.partitionPath)) - .iterator().asScala.toSeq - (p._1, fileSlices) - }) - cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSliceSize).sum - } - - // If the partition value contains InternalRow.empty, we query it as a non-partitioned table. - queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values == InternalRow.empty) - val flushSpend = System.currentTimeMillis() - startTime - logInfo(s"Refresh table ${metaClient.getTableConfig.getTableName}," + - s" spend: $flushSpend ms") - } - - private def fileSliceSize(fileSlice: FileSlice): Long = { - val logFileSize = fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).filter(_ > 0).sum - if (fileSlice.getBaseFile.isPresent) { - fileSlice.getBaseFile.get().getFileLen + logFileSize - } else { - logFileSize - } - } - override def sizeInBytes: Long = { cachedFileSize } +} - override def partitionSchema: StructType = { - if (queryAsNonePartitionedTable) { - // If we read it as Non-Partitioned table, we should not - // return the partition schema. - new StructType() - } else { - _partitionSchemaFromProperties - } - } - - /** - * Get the data schema of the table. - * @return - */ - def dataSchema: StructType = { - val partitionColumns = partitionSchema.fields.map(_.name).toSet - StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name))) - } - - /** - * Returns the FileStatus for all the base files (excluding log files). This should be used only for - * cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic - * implemented internally within Hudi like HoodieBootstrapRelation. This helps avoid the use of path filter - * to filter out log files within Spark. - * - * @return List of FileStatus for base files - */ - def allFiles: Seq[FileStatus] = { - cachedAllInputFileSlices.values.flatten - .filter(_.getBaseFile.isPresent) - .map(_.getBaseFile.get().getFileStatus) - .toSeq - } - - /** - * Prune the partition by the filter.This implementation is fork from - * org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions. - * @param partitionPaths All the partition paths. - * @param predicates The filter condition. - * @return The Pruned partition paths. - */ - def prunePartition(partitionPaths: Seq[PartitionRowPath], - predicates: Seq[Expression]): Seq[PartitionRowPath] = { - - val partitionColumnNames = partitionSchema.fields.map(_.name).toSet - val partitionPruningPredicates = predicates.filter { - _.references.map(_.name).toSet.subsetOf(partitionColumnNames) - } - if (partitionPruningPredicates.nonEmpty) { - val predicate = partitionPruningPredicates.reduce(expressions.And) - - val boundPredicate = InterpretedPredicate(predicate.transform { - case a: AttributeReference => - val index = partitionSchema.indexWhere(a.name == _.name) - BoundReference(index, partitionSchema(index).dataType, nullable = true) - }) - - val prunedPartitionPaths = partitionPaths.filter { - case PartitionRowPath(values, _) => boundPredicate.eval(values) - } - logInfo(s"Total partition size is: ${partitionPaths.size}," + - s" after partition prune size is: ${prunedPartitionPaths.size}") - prunedPartitionPaths - } else { - partitionPaths - } - } - - def getAllQueryPartitionPaths: Seq[PartitionRowPath] = { - val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath) - // Load all the partition path from the basePath, and filter by the query partition path. - // TODO load files from the queryPartitionPath directly. - val partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath).asScala - .filter(_.startsWith(queryPartitionPath)) - - val partitionSchema = _partitionSchemaFromProperties - val timeZoneId = CaseInsensitiveMap(options) - .get(DateTimeUtils.TIMEZONE_OPTION) - .getOrElse(SQLConf.get.sessionLocalTimeZone) - - val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark - .sessionState.conf) - // Convert partition path to PartitionRowPath - partitionPaths.map { partitionPath => - val partitionRow = if (partitionSchema.fields.length == 0) { - // This is a non-partitioned table - InternalRow.empty - } else { - val partitionFragments = partitionPath.split("/") - - if (partitionFragments.length != partitionSchema.fields.length && - partitionSchema.fields.length == 1) { - // If the partition column size is not equal to the partition fragment size - // and the partition column size is 1, we map the whole partition path - // to the partition column which can benefit from the partition prune. - val prefix = s"${partitionSchema.fieldNames.head}=" - val partitionValue = if (partitionPath.startsWith(prefix)) { - // support hive style partition path - partitionPath.substring(prefix.length) - } else { - partitionPath - } - InternalRow.fromSeq(Seq(UTF8String.fromString(partitionValue))) - } else if (partitionFragments.length != partitionSchema.fields.length && - partitionSchema.fields.length > 1) { - // If the partition column size is not equal to the partition fragments size - // and the partition column size > 1, we do not know how to map the partition - // fragments to the partition columns. So we trait it as a Non-Partitioned Table - // for the query which do not benefit from the partition prune. - logWarning( s"Cannot do the partition prune for table $basePath." + - s"The partitionFragments size (${partitionFragments.mkString(",")})" + - s" is not equal to the partition columns size(${partitionSchema.fields.mkString(",")})") - InternalRow.empty - } else { // If partitionSeqs.length == partitionSchema.fields.length - - // Append partition name to the partition value if the - // HIVE_STYLE_PARTITIONING is disable. - // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" - val partitionWithName = - partitionFragments.zip(partitionSchema).map { - case (partition, field) => - if (partition.indexOf("=") == -1) { - s"${field.name}=$partition" - } else { - partition - } - }.mkString("/") - val pathWithPartitionName = new Path(basePath, partitionWithName) - val partitionDataTypes = partitionSchema.fields.map(f => f.name -> f.dataType).toMap - - sparkParsePartitionUtil.parsePartition(pathWithPartitionName, - typeInference = false, Set(new Path(basePath)), partitionDataTypes, - DateTimeUtils.getTimeZone(timeZoneId)) - } - } - PartitionRowPath(partitionRow, partitionPath) - } - } - - /** - * Load all partition paths and it's files under the query table path. - */ - private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = { - val partitionRowPaths = getAllQueryPartitionPaths - // List files in all of the partition path. - val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]() - val cachePartitionToFiles = mutable.Map[PartitionRowPath, Array[FileStatus]]() - // Fetch from the FileStatusCache - partitionRowPaths.foreach { partitionRowPath => - fileStatusCache.getLeafFiles(partitionRowPath.fullPartitionPath(basePath)) match { - case Some(filesInPartition) => - cachePartitionToFiles.put(partitionRowPath, filesInPartition) - - case None => pathToFetch.append(partitionRowPath) - } - } - - val fetchedPartitionToFiles = - if (pathToFetch.nonEmpty) { - val fullPartitionPathsToFetch = pathToFetch.map(p => (p, p.fullPartitionPath(basePath).toString)).toMap - val partitionToFilesMap = FSUtils.getFilesInPartitions(engineContext, metadataConfig, basePath, - fullPartitionPathsToFetch.values.toArray, fileSystemStorageConfig.getSpillableDir) - fullPartitionPathsToFetch.map(p => { - (p._1, partitionToFilesMap.get(p._2)) - }) - } else { - Map.empty[PartitionRowPath, Array[FileStatus]] - } - - // Update the fileStatusCache - fetchedPartitionToFiles.foreach { - case (partitionRowPath, filesInPartition) => - fileStatusCache.putLeafFiles(partitionRowPath.fullPartitionPath(basePath), filesInPartition) - } - cachePartitionToFiles.toMap ++ fetchedPartitionToFiles - } - - /** - * Represent a partition path. - * e.g. PartitionPath(InternalRow("2021","02","01"), "2021/02/01")) - * @param values The partition values of this partition path. - * @param partitionPath The partition path string. - */ - case class PartitionRowPath(values: InternalRow, partitionPath: String) { - override def equals(other: Any): Boolean = other match { - case PartitionRowPath(_, otherPath) => partitionPath == otherPath - case _ => false - } +object HoodieFileIndex { - override def hashCode(): Int = { - partitionPath.hashCode - } + def getConfigProperties(spark: SparkSession, options: Map[String, String]) = { + val sqlConf: SQLConf = spark.sessionState.conf + val properties = new TypedProperties() - def fullPartitionPath(basePath: String): Path = { - if (partitionPath.isEmpty) { - new Path(basePath) // This is a non-partition path - } else { - new Path(basePath, partitionPath) - } - } + // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users + // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. + properties.setProperty(HoodieMetadataConfig.ENABLE.key(), + sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(), + HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString)) + properties.putAll(options.asJava) + properties } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index b4a9800d994b9..bc83a85415de2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -168,31 +168,31 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, val fsView = new HoodieTableFileSystemView(metaClient, commitsTimelineToReturn, affectedFileStatus) // Iterate partitions to create splits - val fileGroup = getWritePartitionPaths(metadataList).flatMap(partitionPath => + val fileGroups = getWritePartitionPaths(metadataList).flatMap(partitionPath => fsView.getAllFileGroups(partitionPath).iterator() ).toList - val latestCommit = fsView.getLastInstant.get().getTimestamp + val latestCommit = fsView.getLastInstant.get.getTimestamp if (log.isDebugEnabled) { - fileGroup.foreach(f => log.debug(s"current file group id: " + - s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get().toString}")) + fileGroups.foreach(f => log.debug(s"current file group id: " + + s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get.toString}")) } // Filter files based on user defined glob pattern val pathGlobPattern = optParams.getOrElse( DataSourceReadOptions.INCR_PATH_GLOB.key, DataSourceReadOptions.INCR_PATH_GLOB.defaultValue) - val filteredFileGroup = if(!pathGlobPattern - .equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) { + val filteredFileGroup = if (!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) { val globMatcher = new GlobPattern("*" + pathGlobPattern) - fileGroup.filter(f => { - if (f.getLatestFileSlice.get().getBaseFile.isPresent) { - globMatcher.matches(f.getLatestFileSlice.get().getBaseFile.get.getPath) + fileGroups.filter(fg => { + val latestFileSlice = fg.getLatestFileSlice.get + if (latestFileSlice.getBaseFile.isPresent) { + globMatcher.matches(latestFileSlice.getBaseFile.get.getPath) } else { - globMatcher.matches(f.getLatestFileSlice.get().getLatestLogFile.get().getPath.toString) + globMatcher.matches(latestFileSlice.getLatestLogFile.get.getPath.toString) } }) } else { - fileGroup + fileGroups } // Build HoodieMergeOnReadFileSplit. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index b30e5731021ea..c4d670bb62f8a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hudi.HoodieSqlCommonUtils @@ -198,9 +199,9 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, // If convert success to catalyst expression, use the partition prune val fileSlices = if (partitionFilterExpression.isDefined) { - hoodieFileIndex.listFileSlices(Seq(partitionFilterExpression.get), Seq.empty) + hoodieFileIndex.listFileSlices(Seq(partitionFilterExpression.get)) } else { - hoodieFileIndex.listFileSlices(Seq.empty, Seq.empty) + hoodieFileIndex.listFileSlices(Seq.empty[Expression]) } if (fileSlices.isEmpty) { @@ -223,6 +224,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, val logPaths = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala .map(logFile => MergeOnReadSnapshotRelation.getFilePath(logFile.getPath)).toList val logPathsOptional = if (logPaths.isEmpty) Option.empty else Option(logPaths) + HoodieMergeOnReadFileSplit(partitionedFile, logPathsOptional, queryInstant, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) }).toList diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala new file mode 100644 index 0000000000000..e4f7b82e9206e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.SparkHoodieTableFileIndex.generateFieldMap +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String + +/** + * Implementation of the [[AbstractHoodieTableFileIndex]] for Spark + * + * @param spark spark session + * @param metaClient Hudi table's meta-client + * @param schemaSpec optional table's schema + * @param configProperties unifying configuration (in the form of generic properties) + * @param specifiedQueryInstant instant as of which table is being queried + * @param fileStatusCache transient cache of fetched [[FileStatus]]es + */ +class SparkHoodieTableFileIndex(spark: SparkSession, + metaClient: HoodieTableMetaClient, + schemaSpec: Option[StructType], + configProperties: TypedProperties, + specifiedQueryInstant: Option[String] = None, + @transient fileStatusCache: FileStatusCache = NoopCache) + extends AbstractHoodieTableFileIndex( + engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), + metaClient, + configProperties, + specifiedQueryInstant, + SparkHoodieTableFileIndex.adapt(fileStatusCache) + ) + with SparkAdapterSupport + with Logging { + + /** + * Get the schema of the table. + */ + lazy val schema: StructType = schemaSpec.getOrElse({ + val schemaUtil = new TableSchemaResolver(metaClient) + AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) + }) + + private lazy val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark.sessionState.conf) + + /** + * Get the partition schema from the hoodie.properties. + */ + private lazy val _partitionSchemaFromProperties: StructType = { + val tableConfig = metaClient.getTableConfig + val partitionColumns = tableConfig.getPartitionFields + val nameFieldMap = generateFieldMap(schema) + + if (partitionColumns.isPresent) { + if (tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedKeyGenerator].getName) + || tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedAvroKeyGenerator].getName)) { + val partitionFields = partitionColumns.get().map(column => StructField(column, StringType)) + StructType(partitionFields) + } else { + val partitionFields = partitionColumns.get().map(column => + nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + + s"$column' in the schema[${schema.fields.mkString(",")}]"))) + StructType(partitionFields) + } + } else { + // If the partition columns have not stored in hoodie.properties(the table that was + // created earlier), we trait it as a non-partitioned table. + logWarning("No partition columns available from hoodie.properties." + + " Partition pruning will not work") + new StructType() + } + } + + /** + * Get the data schema of the table. + * + * @return + */ + def dataSchema: StructType = { + val partitionColumns = partitionSchema.fields.map(_.name).toSet + StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name))) + } + + def partitionSchema: StructType = { + if (queryAsNonePartitionedTable) { + // If we read it as Non-Partitioned table, we should not + // return the partition schema. + new StructType() + } else { + _partitionSchemaFromProperties + } + } + + /** + * Fetch list of latest base files w/ corresponding log files, after performing + * partition pruning + * + * @param partitionFilters partition column filters + * @return mapping from string partition paths to its base/log files + */ + def listFileSlices(partitionFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = { + // Prune the partition path by the partition filters + val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters) + prunedPartitions.map(partition => { + (partition.path, cachedAllInputFileSlices(partition)) + }).toMap + } + + /** + * Prune the partition by the filter.This implementation is fork from + * org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions. + * + * @param partitionPaths All the partition paths. + * @param predicates The filter condition. + * @return The Pruned partition paths. + */ + def prunePartition(partitionPaths: Seq[PartitionPath], + predicates: Seq[Expression]): Seq[PartitionPath] = { + + val partitionColumnNames = partitionSchema.fields.map(_.name).toSet + val partitionPruningPredicates = predicates.filter { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + if (partitionPruningPredicates.nonEmpty) { + val predicate = partitionPruningPredicates.reduce(expressions.And) + + val boundPredicate = InterpretedPredicate(predicate.transform { + case a: AttributeReference => + val index = partitionSchema.indexWhere(a.name == _.name) + BoundReference(index, partitionSchema(index).dataType, nullable = true) + }) + + val prunedPartitionPaths = partitionPaths.filter { + case PartitionPath(_, values) => boundPredicate.eval(InternalRow.fromSeq(values)) + } + logInfo(s"Total partition size is: ${partitionPaths.size}," + + s" after partition prune size is: ${prunedPartitionPaths.size}") + prunedPartitionPaths + } else { + partitionPaths + } + } + + protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Any] = { + if (partitionColumns.length == 0) { + // This is a non-partitioned table + Array.empty + } else { + val partitionFragments = partitionPath.split("/") + + if (partitionFragments.length != partitionColumns.length && + partitionColumns.length == 1) { + // If the partition column size is not equal to the partition fragment size + // and the partition column size is 1, we map the whole partition path + // to the partition column which can benefit from the partition prune. + val prefix = s"${partitionColumns.head}=" + val partitionValue = if (partitionPath.startsWith(prefix)) { + // support hive style partition path + partitionPath.substring(prefix.length) + } else { + partitionPath + } + Array(UTF8String.fromString(partitionValue)) + } else if (partitionFragments.length != partitionColumns.length && + partitionColumns.length > 1) { + // If the partition column size is not equal to the partition fragments size + // and the partition column size > 1, we do not know how to map the partition + // fragments to the partition columns. So we trait it as a Non-Partitioned Table + // for the query which do not benefit from the partition prune. + logWarning(s"Cannot do the partition prune for table $basePath." + + s"The partitionFragments size (${partitionFragments.mkString(",")})" + + s" is not equal to the partition columns size(${partitionColumns.mkString(",")})") + Array.empty + } else { + // If partitionSeqs.length == partitionSchema.fields.length + // Append partition name to the partition value if the + // HIVE_STYLE_PARTITIONING is disable. + // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" + val partitionWithName = + partitionFragments.zip(partitionColumns).map { + case (partition, columnName) => + if (partition.indexOf("=") == -1) { + s"${columnName}=$partition" + } else { + partition + } + }.mkString("/") + + val pathWithPartitionName = new Path(basePath, partitionWithName) + val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema) + + partitionValues.toArray + } + } + } + + private def parsePartitionPath(partitionPath: Path, partitionSchema: StructType): Seq[Any] = { + val timeZoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone) + val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap + + sparkParsePartitionUtil.parsePartition( + partitionPath, + typeInference = false, + Set(new Path(basePath)), + partitionDataTypes, + DateTimeUtils.getTimeZone(timeZoneId) + ) + .toSeq(partitionSchema) + } +} + +object SparkHoodieTableFileIndex { + + /** + * This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding + * [[StructField]] object for every field of the provided [[StructType]], recursively. + * + * For example, following struct + *
+   *   StructType(
+   *     StructField("a",
+   *       StructType(
+   *          StructField("b", StringType),
+   *          StructField("c", IntType)
+   *       )
+   *     )
+   *   )
+   * 
+ * + * will be converted into following mapping: + * + *
+   *   "a.b" -> StructField("b", StringType),
+   *   "a.c" -> StructField("c", IntType),
+   * 
+ */ + private def generateFieldMap(structType: StructType) : Map[String, StructField] = { + def traverse(structField: Either[StructField, StructType]) : Map[String, StructField] = { + structField match { + case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap + case Left(field) => field.dataType match { + case struct: StructType => traverse(Right(struct)).map { + case (key, structField) => (s"${field.name}.$key", structField) + } + case _ => Map(field.name -> field) + } + } + } + + traverse(Right(structType)) + } + + private def adapt(cache: FileStatusCache): FileStatusCacheTrait = { + new FileStatusCacheTrait { + override def get(path: Path): Option[Array[FileStatus]] = cache.getLeafFiles(path) + override def put(path: Path, leafFiles: Array[FileStatus]): Unit = cache.putLeafFiles(path, leafFiles) + override def invalidate(): Unit = cache.invalidateAll() + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 62f98cf53617d..d4f7cede7f6d4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -273,7 +273,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { val fileIndex = HoodieFileIndex(spark, metaClient, None, queryOpts ++ Map(HoodieMetadataConfig.ENABLE.key -> useMetaFileList.toString)) // test if table is partitioned on nested columns, getAllQueryPartitionPaths does not break - assert(fileIndex.getAllQueryPartitionPaths.get(0).partitionPath.equals("c")) + assert(fileIndex.getAllQueryPartitionPaths.get(0).path.equals("c")) } private def attribute(partition: String): AttributeReference = { diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala index 1d23a84bc089e..6f29053aef212 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala @@ -143,7 +143,7 @@ class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil { (None, Some(path)) } else { val (columnNames, values) = columns.reverse.unzip - (Some(PartitionValues(columnNames.toSeq, values.toSeq)), Some(currentPath)) + (Some(PartitionValues(columnNames, values)), Some(currentPath)) } }