From 2cb8a44616d74578841ca098f11f9eb2ba99fd4a Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Dec 2021 19:20:19 -0800 Subject: [PATCH 01/22] Extracted `HoodieTableFileIndex` component abstracting handling of the Hudi tables file listing, filtering; Extracted `SparkHoodieTableFileIndex` to bear Spark specific extensions of the `HoodieTableFileIndex` --- .../apache/hudi/HoodieTableFileIndex.scala | 345 ++++++++++++++++++ .../hudi/SparkHoodieTableFileIndex.scala | 161 ++++++++ 2 files changed, 506 insertions(+) create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieTableFileIndex.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieTableFileIndex.scala new file mode 100644 index 0000000000000..41000a40e2336 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieTableFileIndex.scala @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.engine.HoodieEngineContext +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} +import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} +import org.apache.spark.sql.hudi.HoodieSqlUtils +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * A file index which support partition prune for hoodie snapshot and read-optimized query. + * + * Main steps to get the file list for query: + * 1、Load all files and partition values from the table path. + * 2、Do the partition prune by the partition filter condition. + * + * There are 3 cases for this: + * 1、If the partition columns size is equal to the actually partition path level, we + * read it as partitioned table.(e.g partition column is "dt", the partition path is "2021-03-10") + * + * 2、If the partition columns size is not equal to the partition path level, but the partition + * column size is "1" (e.g. partition column is "dt", but the partition path is "2021/03/10" + * who's directory level is 3).We can still read it as a partitioned table. We will mapping the + * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt"). + * + * 3、Else the the partition columns size is not equal to the partition directory level and the + * size is great than "1" (e.g. partition column is "dt,hh", the partition path is "2021/03/10/12") + * , we read it as a Non-Partitioned table because we cannot know how to mapping the partition + * path with the partition columns in this case. + * + */ +abstract class HoodieTableFileIndex(engineContext: HoodieEngineContext, + metaClient: HoodieTableMetaClient, + configProperties: TypedProperties, + @transient fileStatusCache: FileStatusCache = NoopCache) { + /** + * Get all completeCommits. + */ + lazy val completedCommits = metaClient.getCommitsTimeline + .filterCompletedInstants().getInstants.iterator().toList.map(_.getTimestamp) + /** + * Get the partition schema from the hoodie.properties. + */ + private lazy val _partitionColumns: Array[String] = + metaClient.getTableConfig.getPartitionFields.orElse(Array[String]()) + + private lazy val fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder() + .fromProperties(configProperties) + .build() + private lazy val metadataConfig = HoodieMetadataConfig.newBuilder + .fromProperties(configProperties) + .build() + protected val basePath: String = metaClient.getBasePath + + private val queryType = configProperties(QUERY_TYPE.key()) + private val tableType = metaClient.getTableType + + private val specifiedQueryInstant = + Option.apply(configProperties.getString(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)) + .map(HoodieSqlUtils.formatQueryInstant) + + @transient private val queryPath = new Path(configProperties.getOrElse("path", "'path' option required")) + @transient + @volatile protected var cachedFileSize: Long = 0L + @transient + @volatile protected var cachedAllInputFileSlices: Map[PartitionRowPath, Seq[FileSlice]] = _ + @volatile protected var queryAsNonePartitionedTable: Boolean = _ + @transient + @volatile private var fileSystemView: HoodieTableFileSystemView = _ + + refresh0() + + /** + * Fetch list of latest base files and log files per partition. + * + * @return mapping from string partition paths to its base/log files + */ + def listFileSlices(): Map[String, Seq[FileSlice]] = { + if (queryAsNonePartitionedTable) { + // Read as Non-Partitioned table. + cachedAllInputFileSlices.map(entry => (entry._1.partitionPath, entry._2)) + } else { + cachedAllInputFileSlices.keys.toSeq.map(partition => { + (partition.partitionPath, cachedAllInputFileSlices(partition)) + }).toMap + } + } + + /** + * Returns the FileStatus for all the base files (excluding log files). This should be used only for + * cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic + * implemented internally within Hudi like HoodieBootstrapRelation. This helps avoid the use of path filter + * to filter out log files within Spark. + * + * @return List of FileStatus for base files + */ + def allFiles: Seq[FileStatus] = { + cachedAllInputFileSlices.values.flatten + .filter(_.getBaseFile.isPresent) + .map(_.getBaseFile.get().getFileStatus) + .toSeq + } + + def getAllQueryPartitionPaths: Seq[PartitionRowPath] = { + val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath) + // Load all the partition path from the basePath, and filter by the query partition path. + // TODO load files from the queryPartitionPath directly. + val partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath).asScala + .filter(_.startsWith(queryPartitionPath)) + + val partitionSchema = _partitionColumns + + // Convert partition path to PartitionRowPath + partitionPaths.map { partitionPath => + val partitionRow: Array[Any] = if (partitionSchema.length == 0) { + // This is a non-partitioned table + Array.empty + } else { + val partitionFragments = partitionPath.split("/") + + if (partitionFragments.length != partitionSchema.length && + partitionSchema.length == 1) { + // If the partition column size is not equal to the partition fragment size + // and the partition column size is 1, we map the whole partition path + // to the partition column which can benefit from the partition prune. + val prefix = s"${partitionSchema.head}=" + val partitionValue = if (partitionPath.startsWith(prefix)) { + // support hive style partition path + partitionPath.substring(prefix.length) + } else { + partitionPath + } + // TODO replace w/ byte array + Array(UTF8String.fromString(partitionValue)) + } else if (partitionFragments.length != partitionSchema.length && + partitionSchema.length > 1) { + // If the partition column size is not equal to the partition fragments size + // and the partition column size > 1, we do not know how to map the partition + // fragments to the partition columns. So we trait it as a Non-Partitioned Table + // for the query which do not benefit from the partition prune. + logWarning(s"Cannot do the partition prune for table $basePath." + + s"The partitionFragments size (${partitionFragments.mkString(",")})" + + s" is not equal to the partition columns size(${partitionSchema.mkString(",")})") + Array.empty + } else { // If partitionSeqs.length == partitionSchema.fields.length + + // Append partition name to the partition value if the + // HIVE_STYLE_PARTITIONING is disable. + // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" + val partitionWithName = + partitionFragments.zip(partitionSchema).map { + case (partition, columnName) => + if (partition.indexOf("=") == -1) { + s"${columnName}=$partition" + } else { + partition + } + }.mkString("/") + val pathWithPartitionName = new Path(basePath, partitionWithName) + val partitionValues = parsePartitionValuesFromPath(pathWithPartitionName) + .getOrElse(Seq()) + .map(_._2) + .toArray + + partitionValues + } + } + PartitionRowPath(partitionRow, partitionPath) + } + } + + protected def refresh(): Unit = { + fileStatusCache.invalidateAll() + refresh0() + } + + private def fileSliceSize(fileSlice: FileSlice): Long = { + val logFileSize = fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).filter(_ > 0).sum + if (fileSlice.getBaseFile.isPresent) { + fileSlice.getBaseFile.get().getFileLen + logFileSize + } else { + logFileSize + } + } + + /** + * Load all partition paths and it's files under the query table path. + */ + private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = { + val partitionRowPaths = getAllQueryPartitionPaths + // List files in all of the partition path. + val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]() + val cachePartitionToFiles = mutable.Map[PartitionRowPath, Array[FileStatus]]() + // Fetch from the FileStatusCache + partitionRowPaths.foreach { partitionRowPath => + fileStatusCache.getLeafFiles(partitionRowPath.fullPartitionPath(basePath)) match { + case Some(filesInPartition) => + cachePartitionToFiles.put(partitionRowPath, filesInPartition) + + case None => pathToFetch.append(partitionRowPath) + } + } + + val fetchedPartitionToFiles = + if (pathToFetch.nonEmpty) { + val fullPartitionPathsToFetch = pathToFetch.map(p => (p, p.fullPartitionPath(basePath).toString)).toMap + val partitionToFilesMap = FSUtils.getFilesInPartitions(engineContext, metadataConfig, basePath, + fullPartitionPathsToFetch.values.toArray, fileSystemStorageConfig.getSpillableDir) + fullPartitionPathsToFetch.map(p => { + (p._1, partitionToFilesMap.get(p._2)) + }) + } else { + Map.empty[PartitionRowPath, Array[FileStatus]] + } + + // Update the fileStatusCache + fetchedPartitionToFiles.foreach { + case (partitionRowPath, filesInPartition) => + fileStatusCache.putLeafFiles(partitionRowPath.fullPartitionPath(basePath), filesInPartition) + } + cachePartitionToFiles.toMap ++ fetchedPartitionToFiles + } + + private def refresh0(): Unit = { + val startTime = System.currentTimeMillis() + val partitionFiles = loadPartitionPathFiles() + val allFiles = partitionFiles.values.reduceOption(_ ++ _) + .getOrElse(Array.empty[FileStatus]) + + metaClient.reloadActiveTimeline() + val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants + val latestInstant = activeInstants.lastInstant() + // TODO we can optimize the flow by: + // - First fetch list of files from instants of interest + // - Load FileStatus's + fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles) + val queryInstant = if (specifiedQueryInstant.isDefined) { + specifiedQueryInstant + } else if (latestInstant.isPresent) { + Some(latestInstant.get.getTimestamp) + } else { + None + } + + (tableType, queryType) match { + case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) => + // Fetch and store latest base and log files, and their sizes + cachedAllInputFileSlices = partitionFiles.map(p => { + // TODO conditional is incorrect -- this should be queryInstant + val latestSlices = if (latestInstant.isPresent) { + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get) + .iterator().asScala.toSeq + } else { + Seq() + } + (p._1, latestSlices) + }) + cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSlice => { + if (fileSlice.getBaseFile.isPresent) { + fileSlice.getBaseFile.get().getFileLen + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum + } else { + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum + } + }).sum + case (_, _) => + // Fetch and store latest base files and its sizes + cachedAllInputFileSlices = partitionFiles.map(p => { + val fileSlices = specifiedQueryInstant + .map(instant => + fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.partitionPath, instant, true)) + .getOrElse(fileSystemView.getLatestFileSlices(p._1.partitionPath)) + .iterator().asScala.toSeq + (p._1, fileSlices) + }) + cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSliceSize).sum + } + + // If the partition value contains InternalRow.empty, we query it as a non-partitioned table. + queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values.isEmpty) + val flushSpend = System.currentTimeMillis() - startTime + + logInfo(s"Refresh table ${metaClient.getTableConfig.getTableName}," + + s" spend: $flushSpend ms") + } + + // TODO java-doc + protected def parsePartitionValuesFromPath(pathWithPartitionName: Path): Option[Seq[(String, Any)]] + + // TODO eval whether we should just use logger directly + protected def logWarning(str: => String): Unit + protected def logInfo(str: => String): Unit + + /** + * Represent a partition path. + * e.g. PartitionPath(Array("2021","02","01"), "2021/02/01")) + * + * @param values The partition values of this partition path. + * @param partitionPath The partition path string. + */ + case class PartitionRowPath(values: Array[Any], partitionPath: String) { + override def equals(other: Any): Boolean = other match { + case PartitionRowPath(_, otherPath) => partitionPath == otherPath + case _ => false + } + + override def hashCode(): Int = { + partitionPath.hashCode + } + + def fullPartitionPath(basePath: String): Path = { + if (partitionPath.isEmpty) { + new Path(basePath) // This is a non-partition path + } else { + new Path(basePath, partitionPath) + } + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala new file mode 100644 index 0000000000000..33cbdf7f3c748 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.engine.HoodieEngineContext +import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache, SparkParsePartitionUtil} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +// TODO merge w/ HoodieFileIndex +class SparkHoodieTableFileIndex(spark: SparkSession, + engineContext: HoodieEngineContext, + metaClient: HoodieTableMetaClient, + schemaSpec: Option[StructType], + configProperties: TypedProperties, + @transient fileStatusCache: FileStatusCache = NoopCache) + extends HoodieTableFileIndex(engineContext, metaClient, configProperties, fileStatusCache) + with SparkAdapterSupport + with Logging { + + /** + * Get the schema of the table. + */ + lazy val schema: StructType = schemaSpec.getOrElse({ + val schemaUtil = new TableSchemaResolver(metaClient) + AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) + }) + + private lazy val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark.sessionState.conf) + + /** + * Get the partition schema from the hoodie.properties. + */ + private lazy val _partitionSchemaFromProperties: StructType = { + val tableConfig = metaClient.getTableConfig + val partitionColumns = tableConfig.getPartitionFields + + val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap + + if (partitionColumns.isPresent) { + val partitionFields = partitionColumns.get().map(column => + nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + + s"$column' in the schema[${schema.fields.mkString(",")}]"))) + new StructType(partitionFields) + } else { // If the partition columns have not stored in hoodie.properties(the table that was + // created earlier), we trait it as a non-partitioned table. + logWarning("No partition columns available from hoodie.properties." + + " Partition pruning will not work") + new StructType() + } + } + + /** + * Get the data schema of the table. + * + * @return + */ + def dataSchema: StructType = { + val partitionColumns = partitionSchema.fields.map(_.name).toSet + StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name))) + } + + def partitionSchema: StructType = { + if (queryAsNonePartitionedTable) { + // If we read it as Non-Partitioned table, we should not + // return the partition schema. + new StructType() + } else { + _partitionSchemaFromProperties + } + } + + /** + * Fetch list of latest base files w/ corresponding log files, after performing + * partition pruning + * + * @param partitionFilters partition column filters + * @return mapping from string partition paths to its base/log files + */ + def listFileSlices(partitionFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = { + // Prune the partition path by the partition filters + val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters) + prunedPartitions.map(partition => { + (partition.partitionPath, cachedAllInputFileSlices(partition)) + }).toMap + } + + /** + * Prune the partition by the filter.This implementation is fork from + * org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions. + * + * @param partitionPaths All the partition paths. + * @param predicates The filter condition. + * @return The Pruned partition paths. + */ + def prunePartition(partitionPaths: Seq[PartitionRowPath], + predicates: Seq[Expression]): Seq[PartitionRowPath] = { + + val partitionColumnNames = partitionSchema.fields.map(_.name).toSet + val partitionPruningPredicates = predicates.filter { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + if (partitionPruningPredicates.nonEmpty) { + val predicate = partitionPruningPredicates.reduce(expressions.And) + + val boundPredicate = InterpretedPredicate(predicate.transform { + case a: AttributeReference => + val index = partitionSchema.indexWhere(a.name == _.name) + BoundReference(index, partitionSchema(index).dataType, nullable = true) + }) + + val prunedPartitionPaths = partitionPaths.filter { + case PartitionRowPath(values, _) => boundPredicate.eval(InternalRow.fromSeq(values)) + } + logInfo(s"Total partition size is: ${partitionPaths.size}," + + s" after partition prune size is: ${prunedPartitionPaths.size}") + prunedPartitionPaths + } else { + partitionPaths + } + } + + override protected def parsePartitionValuesFromPath(pathWithPartitionName: Path): Option[Seq[(String, Any)]] = { + val timeZoneId = Option.apply(configProperties.getString(DateTimeUtils.TIMEZONE_OPTION)) + .getOrElse(SQLConf.get.sessionLocalTimeZone) + val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap + + sparkParsePartitionUtil.parsePartition( + pathWithPartitionName, + typeInference = false, + Set(new Path(basePath)), + partitionDataTypes, + DateTimeUtils.getTimeZone(timeZoneId) + ) + .map(pv => pv.columnNames.zip(pv.literals).map(t => Tuple2(t._1, t._2.value))) + } +} From 1cd8c0a1ed6da8d62febc9f98cff3be63c146577 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 4 Jan 2022 15:58:39 -0800 Subject: [PATCH 02/22] Rebased `HoodieFileIndex` onto `SparkHoodieTableFileIndex` --- .../org/apache/hudi/HoodieFileIndex.scala | 583 +++--------------- 1 file changed, 101 insertions(+), 482 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index a6c63660e82cf..eadd68d7d05cc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -18,34 +18,21 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} +import org.apache.hudi.HoodieFileIndex.getConfigProperties import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.FileSlice -import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ -import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} - +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate} -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{And, Expression} import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory} import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, Column, SparkSession} -import org.apache.spark.unsafe.types.UTF8String - -import java.util.Properties -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.util.{Failure, Success, Try} /** @@ -70,121 +57,105 @@ import scala.util.{Failure, Success, Try} * path with the partition columns in this case. * */ -case class HoodieFileIndex( - spark: SparkSession, - metaClient: HoodieTableMetaClient, - schemaSpec: Option[StructType], - options: Map[String, String], - @transient fileStatusCache: FileStatusCache = NoopCache) - extends FileIndex with Logging with SparkAdapterSupport { - - private val basePath = metaClient.getBasePath +case class HoodieFileIndex(spark: SparkSession, + metaClient: HoodieTableMetaClient, + schemaSpec: Option[StructType], + options: Map[String, String], + @transient fileStatusCache: FileStatusCache = NoopCache) + extends SparkHoodieTableFileIndex( + engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), + metaClient = metaClient, + schemaSpec = schemaSpec, + configProperties = getConfigProperties(spark, options), + fileStatusCache = fileStatusCache + ) + with FileIndex { @transient private val queryPath = new Path(options.getOrElse("path", "'path' option required")) - private val queryType = options(QUERY_TYPE.key()) - - private val tableType = metaClient.getTableType - - private val specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) - .map(HoodieSqlCommonUtils.formatQueryInstant) - - /** - * Get all completeCommits. - */ - lazy val completedCommits = metaClient.getCommitsTimeline - .filterCompletedInstants().getInstants.iterator().toList.map(_.getTimestamp) - - /** - * Get the schema of the table. - */ - lazy val schema: StructType = schemaSpec.getOrElse({ - val schemaUtil = new TableSchemaResolver(metaClient) - AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) - }) + override def rootPaths: Seq[Path] = queryPath :: Nil - /** - * Get the partition schema from the hoodie.properties. - */ - private lazy val _partitionSchemaFromProperties: StructType = { - val tableConfig = metaClient.getTableConfig - val partitionColumns = tableConfig.getPartitionFields - val nameFieldMap = generateNameFieldMap(Right(schema)) - - if (partitionColumns.isPresent) { - if (tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedKeyGenerator].getName) - || tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedAvroKeyGenerator].getName)) { - val partitionFields = partitionColumns.get().map(column => StructField(column, StringType)) - StructType(partitionFields) - } else { - val partitionFields = partitionColumns.get().map(column => - nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + - s"$column' in the schema[${schema.fields.mkString(",")}]"))) - StructType(partitionFields) - } - } else { // If the partition columns have not stored in hoodie.properties(the table that was - // created earlier), we trait it as a non-partitioned table. - logWarning("No partition columns available from hoodie.properties." + - " Partition pruning will not work") - new StructType() - } + def enableDataSkipping(): Boolean = { + options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), + spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean } /** - * This method traverses StructType recursively to build map of columnName -> StructField - * Note : If there is nesting of columns like ["a.b.c.d", "a.b.c.e"] -> final map will have keys corresponding - * only to ["a.b.c.d", "a.b.c.e"] and not for subsets like ["a.b.c", "a.b"] - * @param structField - * @return map of ( columns names -> StructField ) + * Invoked by Spark to fetch list of latest base files per partition. + * + * @param partitionFilters partition column filters + * @param dataFilters data columns filters + * @return list of PartitionDirectory containing partition to base files mapping */ - private def generateNameFieldMap(structField: Either[StructField, StructType]) : Map[String, StructField] = { - structField match { - case Right(field) => field.fields.map(f => generateNameFieldMap(Left(f))).flatten.toMap - case Left(field) => field.dataType match { - case struct: StructType => generateNameFieldMap(Right(struct)).map { - case (key: String, sf: StructField) => (field.name + "." + key, sf) + override def listFiles(partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + // Look up candidate files names in the col-stats index, if all of the following conditions are true + // - Data-skipping is enabled + // - Col-Stats Index is present + // - List of predicates (filters) is present + val candidateFilesNamesOpt: Option[Set[String]] = + lookupCandidateFilesInColStatsIndex(dataFilters) match { + case Success(opt) => opt + case Failure(e) => + if (e.isInstanceOf[AnalysisException]) { + logDebug("Failed to relay provided data filters to Z-index lookup", e) + } else { + logError("Failed to lookup candidate files in Z-index", e) } - case _ => Map(field.name -> field) - } + Option.empty } - } - private lazy val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) + logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}") - private lazy val configProperties = { - val sqlConf: SQLConf = spark.sessionState.conf - val properties = new Properties() + if (queryAsNonePartitionedTable) { + // Read as Non-Partitioned table + // Filter in candidate files based on the col-stats index lookup + val candidateFiles = + allFiles.filter(fileStatus => + // NOTE: This predicate is true when {@code Option} is empty + candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName)) + ) - // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users - // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. - properties.setProperty(HoodieMetadataConfig.ENABLE.key(), - sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(), - HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString)) - properties.putAll(options.asJava) - properties - } + logInfo(s"Total files : ${allFiles.size}; " + + s"candidate files after data skipping: ${candidateFiles.size}; " + + s"skipping percent ${if (allFiles.nonEmpty) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}") - private lazy val fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder() - .fromProperties(configProperties) - .build() + Seq(PartitionDirectory(InternalRow.empty, candidateFiles)) + } else { + // Prune the partition path by the partition filters + val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters) + var totalFileSize = 0 + var candidateFileSize = 0 - private lazy val metadataConfig = HoodieMetadataConfig.newBuilder - .fromProperties(configProperties) - .build() + val result = prunedPartitions.map { partition => + val baseFileStatuses: Seq[FileStatus] = + cachedAllInputFileSlices(partition) + .map(fs => fs.getBaseFile.orElse(null)) + .filter(_ != null) + .map(_.getFileStatus) - @transient @volatile private var fileSystemView: HoodieTableFileSystemView = _ - @transient @volatile private var cachedAllInputFileSlices: Map[PartitionRowPath, Seq[FileSlice]] = _ - @transient @volatile private var cachedFileSize: Long = 0L + // Filter in candidate files based on the col-stats index lookup + val candidateFiles = + baseFileStatuses.filter(fs => + // NOTE: This predicate is true when {@code Option} is empty + candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName))) - @volatile private var queryAsNonePartitionedTable: Boolean = _ + totalFileSize += baseFileStatuses.size + candidateFileSize += candidateFiles.size + PartitionDirectory(InternalRow.fromSeq(partition.values), candidateFiles) + } - refresh0() + logInfo(s"Total base files: ${totalFileSize}; " + + s"candidate files after data skipping : ${candidateFileSize}; " + + s"skipping percent ${if (allFiles.nonEmpty) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}") - override def rootPaths: Seq[Path] = queryPath :: Nil + result + } + } - def enableDataSkipping(): Boolean = { - options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), - spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean + private def lookupFileNamesMissingFromIndex(allIndexedFileNames: Set[String]) = { + val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet + allBaseFileNames -- allIndexedFileNames } /** @@ -193,8 +164,8 @@ case class HoodieFileIndex( * "num_nulls" statistics for all clustered columns. * * NOTE: This method has to return complete set of candidate files, since only provided candidates will - * ultimately be scanned as part of query execution. Hence, this method has to maintain the - * invariant of conservatively including every base-file's name, that is NOT referenced in its index. + * ultimately be scanned as part of query execution. Hence, this method has to maintain the + * invariant of conservatively including every base-file's name, that is NOT referenced in its index. * * @param queryFilters list of original data filters passed down from querying engine * @return list of pruned (data-skipped) candidate base-files' names @@ -264,390 +235,38 @@ case class HoodieFileIndex( // files and all outstanding base-files, and make sure that all base files not // represented w/in the index are included in the output of this method val notIndexedFileNames = - lookupFileNamesMissingFromIndex(allIndexedFileNames) + lookupFileNamesMissingFromIndex(allIndexedFileNames) prunedCandidateFileNames ++ notIndexedFileNames }) } - private def lookupFileNamesMissingFromIndex(allIndexedFileNames: Set[String]) = { - val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet - allBaseFileNames -- allIndexedFileNames - } - - /** - * Invoked by Spark to fetch list of latest base files per partition. - * - * @param partitionFilters partition column filters - * @param dataFilters data columns filters - * @return list of PartitionDirectory containing partition to base files mapping - */ - override def listFiles(partitionFilters: Seq[Expression], - dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - // Look up candidate files names in the col-stats index, if all of the following conditions are true - // - Data-skipping is enabled - // - Col-Stats Index is present - // - List of predicates (filters) is present - val candidateFilesNamesOpt: Option[Set[String]] = - lookupCandidateFilesInColStatsIndex(dataFilters) match { - case Success(opt) => opt - case Failure(e) => - if (e.isInstanceOf[AnalysisException]) { - logDebug("Failed to relay provided data filters to Z-index lookup", e) - } else { - logError("Failed to lookup candidate files in Z-index", e) - } - Option.empty - } - - logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}") - - if (queryAsNonePartitionedTable) { - // Read as Non-Partitioned table - // Filter in candidate files based on the col-stats index lookup - val candidateFiles = - allFiles.filter(fileStatus => - // NOTE: This predicate is true when {@code Option} is empty - candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName)) - ) - - logInfo(s"Total files : ${allFiles.size}; " + - s"candidate files after data skipping: ${candidateFiles.size}; " + - s"skipping percent ${if (allFiles.nonEmpty) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}") - - Seq(PartitionDirectory(InternalRow.empty, candidateFiles)) - } else { - // Prune the partition path by the partition filters - val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters) - var totalFileSize = 0 - var candidateFileSize = 0 - - val result = prunedPartitions.map { partition => - val baseFileStatuses: Seq[FileStatus] = - cachedAllInputFileSlices(partition) - .map(fs => fs.getBaseFile.orElse(null)) - .filter(_ != null) - .map(_.getFileStatus) - - // Filter in candidate files based on the col-stats index lookup - val candidateFiles = - baseFileStatuses.filter(fs => - // NOTE: This predicate is true when {@code Option} is empty - candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName))) - - totalFileSize += baseFileStatuses.size - candidateFileSize += candidateFiles.size - PartitionDirectory(partition.values, candidateFiles) - } - - logInfo(s"Total base files: ${totalFileSize}; " + - s"candidate files after data skipping : ${candidateFileSize}; " + - s"skipping percent ${if (allFiles.nonEmpty) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}") - - result - } - } - - /** - * Fetch list of latest base files and log files per partition. - * - * @param partitionFilters partition column filters - * @param dataFilters data column filters - * @return mapping from string partition paths to its base/log files - */ - def listFileSlices(partitionFilters: Seq[Expression], - dataFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = { - if (queryAsNonePartitionedTable) { - // Read as Non-Partitioned table. - cachedAllInputFileSlices.map(entry => (entry._1.partitionPath, entry._2)) - } else { - // Prune the partition path by the partition filters - val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters) - prunedPartitions.map(partition => { - (partition.partitionPath, cachedAllInputFileSlices(partition)) - }).toMap - } - } - override def inputFiles: Array[String] = { val fileStatusList = allFiles fileStatusList.map(_.getPath.toString).toArray } override def refresh(): Unit = { - fileStatusCache.invalidateAll() - refresh0() - } - - private def refresh0(): Unit = { - val startTime = System.currentTimeMillis() - val partitionFiles = loadPartitionPathFiles() - val allFiles = partitionFiles.values.reduceOption(_ ++ _) - .getOrElse(Array.empty[FileStatus]) - - metaClient.reloadActiveTimeline() - val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants - val latestInstant = activeInstants.lastInstant() - fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles) - val queryInstant = if (specifiedQueryInstant.isDefined) { - specifiedQueryInstant - } else if (latestInstant.isPresent) { - Some(latestInstant.get.getTimestamp) - } else { - None - } - - (tableType, queryType) match { - case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) => - // Fetch and store latest base and log files, and their sizes - cachedAllInputFileSlices = partitionFiles.map(p => { - val latestSlices = if (latestInstant.isPresent) { - fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get) - .iterator().asScala.toSeq - } else { - Seq() - } - (p._1, latestSlices) - }) - cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSlice => { - if (fileSlice.getBaseFile.isPresent) { - fileSlice.getBaseFile.get().getFileLen + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum - } else { - fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum - } - }).sum - case (_, _) => - // Fetch and store latest base files and its sizes - cachedAllInputFileSlices = partitionFiles.map(p => { - val fileSlices = specifiedQueryInstant - .map(instant => - fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.partitionPath, instant, true)) - .getOrElse(fileSystemView.getLatestFileSlices(p._1.partitionPath)) - .iterator().asScala.toSeq - (p._1, fileSlices) - }) - cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSliceSize).sum - } - - // If the partition value contains InternalRow.empty, we query it as a non-partitioned table. - queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values == InternalRow.empty) - val flushSpend = System.currentTimeMillis() - startTime - logInfo(s"Refresh table ${metaClient.getTableConfig.getTableName}," + - s" spend: $flushSpend ms") - } - - private def fileSliceSize(fileSlice: FileSlice): Long = { - val logFileSize = fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).filter(_ > 0).sum - if (fileSlice.getBaseFile.isPresent) { - fileSlice.getBaseFile.get().getFileLen + logFileSize - } else { - logFileSize - } + super.refresh() } override def sizeInBytes: Long = { cachedFileSize } +} - override def partitionSchema: StructType = { - if (queryAsNonePartitionedTable) { - // If we read it as Non-Partitioned table, we should not - // return the partition schema. - new StructType() - } else { - _partitionSchemaFromProperties - } - } - - /** - * Get the data schema of the table. - * @return - */ - def dataSchema: StructType = { - val partitionColumns = partitionSchema.fields.map(_.name).toSet - StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name))) - } - - /** - * Returns the FileStatus for all the base files (excluding log files). This should be used only for - * cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic - * implemented internally within Hudi like HoodieBootstrapRelation. This helps avoid the use of path filter - * to filter out log files within Spark. - * - * @return List of FileStatus for base files - */ - def allFiles: Seq[FileStatus] = { - cachedAllInputFileSlices.values.flatten - .filter(_.getBaseFile.isPresent) - .map(_.getBaseFile.get().getFileStatus) - .toSeq - } - - /** - * Prune the partition by the filter.This implementation is fork from - * org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions. - * @param partitionPaths All the partition paths. - * @param predicates The filter condition. - * @return The Pruned partition paths. - */ - def prunePartition(partitionPaths: Seq[PartitionRowPath], - predicates: Seq[Expression]): Seq[PartitionRowPath] = { - - val partitionColumnNames = partitionSchema.fields.map(_.name).toSet - val partitionPruningPredicates = predicates.filter { - _.references.map(_.name).toSet.subsetOf(partitionColumnNames) - } - if (partitionPruningPredicates.nonEmpty) { - val predicate = partitionPruningPredicates.reduce(expressions.And) - - val boundPredicate = InterpretedPredicate(predicate.transform { - case a: AttributeReference => - val index = partitionSchema.indexWhere(a.name == _.name) - BoundReference(index, partitionSchema(index).dataType, nullable = true) - }) - - val prunedPartitionPaths = partitionPaths.filter { - case PartitionRowPath(values, _) => boundPredicate.eval(values) - } - logInfo(s"Total partition size is: ${partitionPaths.size}," + - s" after partition prune size is: ${prunedPartitionPaths.size}") - prunedPartitionPaths - } else { - partitionPaths - } - } - - def getAllQueryPartitionPaths: Seq[PartitionRowPath] = { - val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath) - // Load all the partition path from the basePath, and filter by the query partition path. - // TODO load files from the queryPartitionPath directly. - val partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath).asScala - .filter(_.startsWith(queryPartitionPath)) - - val partitionSchema = _partitionSchemaFromProperties - val timeZoneId = CaseInsensitiveMap(options) - .get(DateTimeUtils.TIMEZONE_OPTION) - .getOrElse(SQLConf.get.sessionLocalTimeZone) - - val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark - .sessionState.conf) - // Convert partition path to PartitionRowPath - partitionPaths.map { partitionPath => - val partitionRow = if (partitionSchema.fields.length == 0) { - // This is a non-partitioned table - InternalRow.empty - } else { - val partitionFragments = partitionPath.split("/") - - if (partitionFragments.length != partitionSchema.fields.length && - partitionSchema.fields.length == 1) { - // If the partition column size is not equal to the partition fragment size - // and the partition column size is 1, we map the whole partition path - // to the partition column which can benefit from the partition prune. - val prefix = s"${partitionSchema.fieldNames.head}=" - val partitionValue = if (partitionPath.startsWith(prefix)) { - // support hive style partition path - partitionPath.substring(prefix.length) - } else { - partitionPath - } - InternalRow.fromSeq(Seq(UTF8String.fromString(partitionValue))) - } else if (partitionFragments.length != partitionSchema.fields.length && - partitionSchema.fields.length > 1) { - // If the partition column size is not equal to the partition fragments size - // and the partition column size > 1, we do not know how to map the partition - // fragments to the partition columns. So we trait it as a Non-Partitioned Table - // for the query which do not benefit from the partition prune. - logWarning( s"Cannot do the partition prune for table $basePath." + - s"The partitionFragments size (${partitionFragments.mkString(",")})" + - s" is not equal to the partition columns size(${partitionSchema.fields.mkString(",")})") - InternalRow.empty - } else { // If partitionSeqs.length == partitionSchema.fields.length - - // Append partition name to the partition value if the - // HIVE_STYLE_PARTITIONING is disable. - // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" - val partitionWithName = - partitionFragments.zip(partitionSchema).map { - case (partition, field) => - if (partition.indexOf("=") == -1) { - s"${field.name}=$partition" - } else { - partition - } - }.mkString("/") - val pathWithPartitionName = new Path(basePath, partitionWithName) - val partitionDataTypes = partitionSchema.fields.map(f => f.name -> f.dataType).toMap - - sparkParsePartitionUtil.parsePartition(pathWithPartitionName, - typeInference = false, Set(new Path(basePath)), partitionDataTypes, - DateTimeUtils.getTimeZone(timeZoneId)) - } - } - PartitionRowPath(partitionRow, partitionPath) - } - } - - /** - * Load all partition paths and it's files under the query table path. - */ - private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = { - val partitionRowPaths = getAllQueryPartitionPaths - // List files in all of the partition path. - val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]() - val cachePartitionToFiles = mutable.Map[PartitionRowPath, Array[FileStatus]]() - // Fetch from the FileStatusCache - partitionRowPaths.foreach { partitionRowPath => - fileStatusCache.getLeafFiles(partitionRowPath.fullPartitionPath(basePath)) match { - case Some(filesInPartition) => - cachePartitionToFiles.put(partitionRowPath, filesInPartition) - - case None => pathToFetch.append(partitionRowPath) - } - } - - val fetchedPartitionToFiles = - if (pathToFetch.nonEmpty) { - val fullPartitionPathsToFetch = pathToFetch.map(p => (p, p.fullPartitionPath(basePath).toString)).toMap - val partitionToFilesMap = FSUtils.getFilesInPartitions(engineContext, metadataConfig, basePath, - fullPartitionPathsToFetch.values.toArray, fileSystemStorageConfig.getSpillableDir) - fullPartitionPathsToFetch.map(p => { - (p._1, partitionToFilesMap.get(p._2)) - }) - } else { - Map.empty[PartitionRowPath, Array[FileStatus]] - } +object HoodieFileIndex { - // Update the fileStatusCache - fetchedPartitionToFiles.foreach { - case (partitionRowPath, filesInPartition) => - fileStatusCache.putLeafFiles(partitionRowPath.fullPartitionPath(basePath), filesInPartition) - } - cachePartitionToFiles.toMap ++ fetchedPartitionToFiles - } - - /** - * Represent a partition path. - * e.g. PartitionPath(InternalRow("2021","02","01"), "2021/02/01")) - * @param values The partition values of this partition path. - * @param partitionPath The partition path string. - */ - case class PartitionRowPath(values: InternalRow, partitionPath: String) { - override def equals(other: Any): Boolean = other match { - case PartitionRowPath(_, otherPath) => partitionPath == otherPath - case _ => false - } - - override def hashCode(): Int = { - partitionPath.hashCode - } + def getConfigProperties(spark: SparkSession, options: Map[String, String]) = { + val sqlConf: SQLConf = spark.sessionState.conf + val properties = new TypedProperties() - def fullPartitionPath(basePath: String): Path = { - if (partitionPath.isEmpty) { - new Path(basePath) // This is a non-partition path - } else { - new Path(basePath, partitionPath) - } - } + // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users + // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. + properties.setProperty(HoodieMetadataConfig.ENABLE.key(), + sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(), + HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString)) + properties.putAll(options.asJava) + properties } } From 28b7742292e54a2133b8dbbdab8284e3924b1ea2 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 4 Jan 2022 15:59:16 -0800 Subject: [PATCH 03/22] Fixed refs --- .../scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index b30e5731021ea..c4d670bb62f8a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hudi.HoodieSqlCommonUtils @@ -198,9 +199,9 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, // If convert success to catalyst expression, use the partition prune val fileSlices = if (partitionFilterExpression.isDefined) { - hoodieFileIndex.listFileSlices(Seq(partitionFilterExpression.get), Seq.empty) + hoodieFileIndex.listFileSlices(Seq(partitionFilterExpression.get)) } else { - hoodieFileIndex.listFileSlices(Seq.empty, Seq.empty) + hoodieFileIndex.listFileSlices(Seq.empty[Expression]) } if (fileSlices.isEmpty) { @@ -223,6 +224,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, val logPaths = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala .map(logFile => MergeOnReadSnapshotRelation.getFilePath(logFile.getPath)).toList val logPathsOptional = if (logPaths.isEmpty) Option.empty else Option(logPaths) + HoodieMergeOnReadFileSplit(partitionedFile, logPathsOptional, queryInstant, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) }).toList From 5ecd7e226f3a5b3f183db638368391cebad1abe0 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 4 Jan 2022 15:59:34 -0800 Subject: [PATCH 04/22] Tidying up --- .../java/org/apache/hudi/common/model/FileSlice.java | 4 ++-- .../hudi/hadoop/utils/HoodieInputFormatUtils.java | 10 ++++++---- .../src/main/scala/org/apache/hudi/DefaultSource.scala | 1 - .../apache/hudi/MergeOnReadIncrementalRelation.scala | 3 +-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java index 688e72bd786a2..0fc580db0b657 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java @@ -34,12 +34,12 @@ public class FileSlice implements Serializable { /** * File Group Id of the Slice. */ - private HoodieFileGroupId fileGroupId; + private final HoodieFileGroupId fileGroupId; /** * Point in the timeline, at which the slice was created. */ - private String baseInstantTime; + private final String baseInstantTime; /** * data file, with the compacted data, for this slice. diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index 95752bac316c8..a5a3f7e215073 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -421,8 +421,8 @@ public static Map> groupFileStatusForSna } public static Map> groupSnapshotPathsByMetaClient( - Collection metaClientList, - List snapshotPaths + Collection metaClientList, + List snapshotPaths ) { Map> grouped = new HashMap<>(); metaClientList.forEach(metaClient -> grouped.put(metaClient, new ArrayList<>())); @@ -445,9 +445,11 @@ public static List filterFileStatusForSnapshotMode(JobConf job, Map< HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job); List returns = new ArrayList<>(); - Map> groupedPaths = HoodieInputFormatUtils - .groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths); + Map> groupedPaths = + HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths); + Map fsViewCache = new HashMap<>(); + LOG.info("Found a total of " + groupedPaths.size() + " groups"); try { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index a57691f9f5cf4..b15cde597a193 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -193,7 +193,6 @@ class DefaultSource extends RelationProvider } if (useHoodieFileIndex) { - val fileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient, if (schema == null) Option.empty[StructType] else Some(schema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index b4a9800d994b9..f2d289b3c7442 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -181,8 +181,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, val pathGlobPattern = optParams.getOrElse( DataSourceReadOptions.INCR_PATH_GLOB.key, DataSourceReadOptions.INCR_PATH_GLOB.defaultValue) - val filteredFileGroup = if(!pathGlobPattern - .equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) { + val filteredFileGroup = if (!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) { val globMatcher = new GlobPattern("*" + pathGlobPattern) fileGroup.filter(f => { if (f.getLatestFileSlice.get().getBaseFile.isPresent) { From 3e87fc05b13bb61c275d53c8a2922ebd714deaa3 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 5 Jan 2022 17:22:56 -0800 Subject: [PATCH 05/22] `HoodieTableFileIndex` > `AbstractHoodieTableFileIndex` --- .../scala/org/apache/hudi/HoodieFileIndex.scala | 9 ++------- ...x.scala => AbstractHoodieTableFileIndex.scala} | 8 ++++---- .../apache/hudi/SparkHoodieTableFileIndex.scala | 15 +++++++++++---- 3 files changed, 17 insertions(+), 15 deletions(-) rename hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/{HoodieTableFileIndex.scala => AbstractHoodieTableFileIndex.scala} (97%) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index eadd68d7d05cc..12fce162c55b3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -19,10 +19,8 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hudi.HoodieFileIndex.getConfigProperties -import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{And, Expression} import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory} @@ -57,13 +55,14 @@ import scala.util.{Failure, Success, Try} * path with the partition columns in this case. * */ +// TODO rename to HoodieSparkSqlFileIndex case class HoodieFileIndex(spark: SparkSession, metaClient: HoodieTableMetaClient, schemaSpec: Option[StructType], options: Map[String, String], @transient fileStatusCache: FileStatusCache = NoopCache) extends SparkHoodieTableFileIndex( - engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), + spark = spark, metaClient = metaClient, schemaSpec = schemaSpec, configProperties = getConfigProperties(spark, options), @@ -246,10 +245,6 @@ case class HoodieFileIndex(spark: SparkSession, fileStatusList.map(_.getPath.toString).toArray } - override def refresh(): Unit = { - super.refresh() - } - override def sizeInBytes: Long = { cachedFileSize } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala similarity index 97% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieTableFileIndex.scala rename to hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala index 41000a40e2336..c122146cb5c35 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala @@ -56,10 +56,10 @@ import scala.collection.mutable * path with the partition columns in this case. * */ -abstract class HoodieTableFileIndex(engineContext: HoodieEngineContext, - metaClient: HoodieTableMetaClient, - configProperties: TypedProperties, - @transient fileStatusCache: FileStatusCache = NoopCache) { +abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, + metaClient: HoodieTableMetaClient, + configProperties: TypedProperties, + @transient fileStatusCache: FileStatusCache = NoopCache) { /** * Get all completeCommits. */ diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 33cbdf7f3c748..a70f1497799bc 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -18,10 +18,13 @@ package org.apache.hudi import org.apache.hadoop.fs.Path +import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.engine.HoodieEngineContext import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, expressions} @@ -29,16 +32,20 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundRefer import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache, SparkParsePartitionUtil} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructField, StructType} -// TODO merge w/ HoodieFileIndex +// TODO unify w/ HoodieFileIndex class SparkHoodieTableFileIndex(spark: SparkSession, - engineContext: HoodieEngineContext, metaClient: HoodieTableMetaClient, schemaSpec: Option[StructType], configProperties: TypedProperties, @transient fileStatusCache: FileStatusCache = NoopCache) - extends HoodieTableFileIndex(engineContext, metaClient, configProperties, fileStatusCache) + extends AbstractHoodieTableFileIndex( + engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), + metaClient, + configProperties, + fileStatusCache + ) with SparkAdapterSupport with Logging { From 6ae85f8ebce2b62b2c9ef602fc9a980f06b02961 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 5 Jan 2022 17:24:21 -0800 Subject: [PATCH 06/22] Carrying over changes lost during rebase (7d046f, 1f7afb) --- .../hudi/SparkHoodieTableFileIndex.scala | 36 +++++++++++++++---- .../Spark3ParsePartitionUtil.scala | 2 +- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index a70f1497799bc..ddb06b991ba55 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -65,14 +65,19 @@ class SparkHoodieTableFileIndex(spark: SparkSession, private lazy val _partitionSchemaFromProperties: StructType = { val tableConfig = metaClient.getTableConfig val partitionColumns = tableConfig.getPartitionFields - - val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap + val nameFieldMap = generateNameFieldMap(Right(schema)) if (partitionColumns.isPresent) { - val partitionFields = partitionColumns.get().map(column => - nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + - s"$column' in the schema[${schema.fields.mkString(",")}]"))) - new StructType(partitionFields) + if (tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedKeyGenerator].getName) + || tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedAvroKeyGenerator].getName)) { + val partitionFields = partitionColumns.get().map(column => StructField(column, StringType)) + StructType(partitionFields) + } else { + val partitionFields = partitionColumns.get().map(column => + nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + + s"$column' in the schema[${schema.fields.mkString(",")}]"))) + StructType(partitionFields) + } } else { // If the partition columns have not stored in hoodie.properties(the table that was // created earlier), we trait it as a non-partitioned table. logWarning("No partition columns available from hoodie.properties." + @@ -165,4 +170,23 @@ class SparkHoodieTableFileIndex(spark: SparkSession, ) .map(pv => pv.columnNames.zip(pv.literals).map(t => Tuple2(t._1, t._2.value))) } + + /** + * This method traverses StructType recursively to build map of columnName -> StructField + * Note : If there is nesting of columns like ["a.b.c.d", "a.b.c.e"] -> final map will have keys corresponding + * only to ["a.b.c.d", "a.b.c.e"] and not for subsets like ["a.b.c", "a.b"] + * @param structField + * @return map of ( columns names -> StructField ) + */ + private def generateNameFieldMap(structField: Either[StructField, StructType]) : Map[String, StructField] = { + structField match { + case Right(field) => field.fields.map(f => generateNameFieldMap(Left(f))).flatten.toMap + case Left(field) => field.dataType match { + case struct: StructType => generateNameFieldMap(Right(struct)).map { + case (key: String, sf: StructField) => (field.name + "." + key, sf) + } + case _ => Map(field.name -> field) + } + } + } } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala index 1d23a84bc089e..6f29053aef212 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala @@ -143,7 +143,7 @@ class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil { (None, Some(path)) } else { val (columnNames, values) = columns.reverse.unzip - (Some(PartitionValues(columnNames.toSeq, values.toSeq)), Some(currentPath)) + (Some(PartitionValues(columnNames, values)), Some(currentPath)) } } From 1a56efaa17358b0a889ae9eb366501fe08a4a641 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 5 Jan 2022 17:35:09 -0800 Subject: [PATCH 07/22] Fixed partitioning columns parsing after recent API changes --- .../apache/hudi/AbstractHoodieTableFileIndex.scala | 12 ++++++------ .../org/apache/hudi/SparkHoodieTableFileIndex.scala | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala index c122146cb5c35..40b82833bac09 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala @@ -184,10 +184,7 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, } }.mkString("/") val pathWithPartitionName = new Path(basePath, partitionWithName) - val partitionValues = parsePartitionValuesFromPath(pathWithPartitionName) - .getOrElse(Seq()) - .map(_._2) - .toArray + val partitionValues = parsePartitionValuesFromPath(pathWithPartitionName).toArray partitionValues } @@ -310,8 +307,11 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, s" spend: $flushSpend ms") } - // TODO java-doc - protected def parsePartitionValuesFromPath(pathWithPartitionName: Path): Option[Seq[(String, Any)]] + /** + * Parses partitioning columns' values from the provided path to the partition + * @param partitionPath partition's path to parse partitioning columns' values from + */ + protected def parsePartitionValuesFromPath(partitionPath: Path): Seq[Any] // TODO eval whether we should just use logger directly protected def logWarning(str: => String): Unit diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index ddb06b991ba55..1b67968858836 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -156,19 +156,19 @@ class SparkHoodieTableFileIndex(spark: SparkSession, } } - override protected def parsePartitionValuesFromPath(pathWithPartitionName: Path): Option[Seq[(String, Any)]] = { + override protected def parsePartitionValuesFromPath(partitionPath: Path): Seq[Any] = { val timeZoneId = Option.apply(configProperties.getString(DateTimeUtils.TIMEZONE_OPTION)) .getOrElse(SQLConf.get.sessionLocalTimeZone) val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap sparkParsePartitionUtil.parsePartition( - pathWithPartitionName, + partitionPath, typeInference = false, Set(new Path(basePath)), partitionDataTypes, DateTimeUtils.getTimeZone(timeZoneId) ) - .map(pv => pv.columnNames.zip(pv.literals).map(t => Tuple2(t._1, t._2.value))) + .toSeq(partitionSchema) } /** From 53761cec697902deeeed931453f1b26b387ffc19 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 5 Jan 2022 17:42:23 -0800 Subject: [PATCH 08/22] Fixed compilation --- .../src/main/scala/org/apache/hudi/HoodieFileIndex.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 12fce162c55b3..5dcd9712dadfd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -240,6 +240,8 @@ case class HoodieFileIndex(spark: SparkSession, }) } + override def refresh(): Unit = super.refresh() + override def inputFiles: Array[String] = { val fileStatusList = allFiles fileStatusList.map(_.getPath.toString).toArray From 1a204a515ada17e9a4f1b9184b4ee7d557947283 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 5 Jan 2022 19:00:06 -0800 Subject: [PATCH 09/22] Fixed incorrect instant being queried by --- .../scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala index 40b82833bac09..80d379d2e308c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala @@ -270,8 +270,7 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) => // Fetch and store latest base and log files, and their sizes cachedAllInputFileSlices = partitionFiles.map(p => { - // TODO conditional is incorrect -- this should be queryInstant - val latestSlices = if (latestInstant.isPresent) { + val latestSlices = if (queryInstant.isDefined) { fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get) .iterator().asScala.toSeq } else { From 61cec5cdf0a07fc3cbe6e69676ce8115a8ba5628 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 5 Jan 2022 19:19:35 -0800 Subject: [PATCH 10/22] Tidying up --- .../hudi/MergeOnReadIncrementalRelation.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index f2d289b3c7442..bc83a85415de2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -168,13 +168,13 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, val fsView = new HoodieTableFileSystemView(metaClient, commitsTimelineToReturn, affectedFileStatus) // Iterate partitions to create splits - val fileGroup = getWritePartitionPaths(metadataList).flatMap(partitionPath => + val fileGroups = getWritePartitionPaths(metadataList).flatMap(partitionPath => fsView.getAllFileGroups(partitionPath).iterator() ).toList - val latestCommit = fsView.getLastInstant.get().getTimestamp + val latestCommit = fsView.getLastInstant.get.getTimestamp if (log.isDebugEnabled) { - fileGroup.foreach(f => log.debug(s"current file group id: " + - s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get().toString}")) + fileGroups.foreach(f => log.debug(s"current file group id: " + + s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get.toString}")) } // Filter files based on user defined glob pattern @@ -183,15 +183,16 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, DataSourceReadOptions.INCR_PATH_GLOB.defaultValue) val filteredFileGroup = if (!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) { val globMatcher = new GlobPattern("*" + pathGlobPattern) - fileGroup.filter(f => { - if (f.getLatestFileSlice.get().getBaseFile.isPresent) { - globMatcher.matches(f.getLatestFileSlice.get().getBaseFile.get.getPath) + fileGroups.filter(fg => { + val latestFileSlice = fg.getLatestFileSlice.get + if (latestFileSlice.getBaseFile.isPresent) { + globMatcher.matches(latestFileSlice.getBaseFile.get.getPath) } else { - globMatcher.matches(f.getLatestFileSlice.get().getLatestLogFile.get().getPath.toString) + globMatcher.matches(latestFileSlice.getLatestLogFile.get.getPath.toString) } }) } else { - fileGroup + fileGroups } // Build HoodieMergeOnReadFileSplit. From f23cc60b8abaffa124d92b87b7cfce1a7b991052 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 5 Jan 2022 20:23:47 -0800 Subject: [PATCH 11/22] Cleaned up field-map generation seq --- .../hudi/SparkHoodieTableFileIndex.scala | 60 +++++++++++++------ 1 file changed, 42 insertions(+), 18 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 1b67968858836..2e785ce86ec9f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -18,19 +18,19 @@ package org.apache.hudi import org.apache.hadoop.fs.Path +import org.apache.hudi.SparkHoodieTableFileIndex.generateFieldMap import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.TypedProperties -import org.apache.hudi.common.engine.HoodieEngineContext import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.{InternalRow, expressions} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache, SparkParsePartitionUtil} +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -65,7 +65,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, private lazy val _partitionSchemaFromProperties: StructType = { val tableConfig = metaClient.getTableConfig val partitionColumns = tableConfig.getPartitionFields - val nameFieldMap = generateNameFieldMap(Right(schema)) + val nameFieldMap = generateFieldMap(schema) if (partitionColumns.isPresent) { if (tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedKeyGenerator].getName) @@ -78,7 +78,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession, s"$column' in the schema[${schema.fields.mkString(",")}]"))) StructType(partitionFields) } - } else { // If the partition columns have not stored in hoodie.properties(the table that was + } else { + // If the partition columns have not stored in hoodie.properties(the table that was // created earlier), we trait it as a non-partitioned table. logWarning("No partition columns available from hoodie.properties." + " Partition pruning will not work") @@ -170,23 +171,46 @@ class SparkHoodieTableFileIndex(spark: SparkSession, ) .toSeq(partitionSchema) } +} + +object SparkHoodieTableFileIndex { /** - * This method traverses StructType recursively to build map of columnName -> StructField - * Note : If there is nesting of columns like ["a.b.c.d", "a.b.c.e"] -> final map will have keys corresponding - * only to ["a.b.c.d", "a.b.c.e"] and not for subsets like ["a.b.c", "a.b"] - * @param structField - * @return map of ( columns names -> StructField ) + * This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding + * [[StructField]] object for every field of the provided [[StructType]], recursively. + * + * For example, following struct + *
+   *   StructType(
+   *     StructField("a",
+   *       StructType(
+   *          StructField("b", StringType),
+   *          StructField("c", IntType)
+   *       )
+   *     )
+   *   )
+   * 
+ * + * will be converted into following mapping: + * + *
+   *   "a.b" -> StructField("b", StringType),
+   *   "a.c" -> StructField("c", IntType),
+   * 
*/ - private def generateNameFieldMap(structField: Either[StructField, StructType]) : Map[String, StructField] = { - structField match { - case Right(field) => field.fields.map(f => generateNameFieldMap(Left(f))).flatten.toMap - case Left(field) => field.dataType match { - case struct: StructType => generateNameFieldMap(Right(struct)).map { - case (key: String, sf: StructField) => (field.name + "." + key, sf) + private def generateFieldMap(structType: StructType) : Map[String, StructField] = { + def traverse(structField: Either[StructField, StructType]) : Map[String, StructField] = { + structField match { + case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap + case Left(field) => field.dataType match { + case struct: StructType => traverse(Right(struct)).map { + case (key, structField) => (s"${field.name}.$key", structField) + } + case _ => Map(field.name -> field) } - case _ => Map(field.name -> field) } } + + traverse(Right(structType)) } -} +} \ No newline at end of file From 3aee47d3c4f16725f1ce1e0ed0b1b05e7cae5705 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 5 Jan 2022 20:47:05 -0800 Subject: [PATCH 12/22] Cleaned up `PartitionPath` abstraction, preparing it to be split into engine-specific impls --- .../hudi/AbstractHoodieTableFileIndex.scala | 266 +++++++++--------- .../hudi/SparkHoodieTableFileIndex.scala | 8 +- .../org/apache/hudi/TestHoodieFileIndex.scala | 2 +- 3 files changed, 144 insertions(+), 132 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala index 80d379d2e308c..b09fc2f4d21ee 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} import org.apache.spark.sql.hudi.HoodieSqlUtils import org.apache.spark.unsafe.types.UTF8String +import java.nio.charset.StandardCharsets import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.mutable @@ -90,7 +91,7 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, @transient @volatile protected var cachedFileSize: Long = 0L @transient - @volatile protected var cachedAllInputFileSlices: Map[PartitionRowPath, Seq[FileSlice]] = _ + @volatile protected var cachedAllInputFileSlices: Map[PartitionPath, Seq[FileSlice]] = _ @volatile protected var queryAsNonePartitionedTable: Boolean = _ @transient @volatile private var fileSystemView: HoodieTableFileSystemView = _ @@ -105,10 +106,10 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, def listFileSlices(): Map[String, Seq[FileSlice]] = { if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table. - cachedAllInputFileSlices.map(entry => (entry._1.partitionPath, entry._2)) + cachedAllInputFileSlices.map(entry => (entry._1.path, entry._2)) } else { cachedAllInputFileSlices.keys.toSeq.map(partition => { - (partition.partitionPath, cachedAllInputFileSlices(partition)) + (partition.path, cachedAllInputFileSlices(partition)) }).toMap } } @@ -128,7 +129,120 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, .toSeq } - def getAllQueryPartitionPaths: Seq[PartitionRowPath] = { + private def refresh0(): Unit = { + val startTime = System.currentTimeMillis() + val partitionFiles = loadPartitionPathFiles() + val allFiles = partitionFiles.values.reduceOption(_ ++ _) + .getOrElse(Array.empty[FileStatus]) + + metaClient.reloadActiveTimeline() + val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants + val latestInstant = activeInstants.lastInstant() + // TODO we can optimize the flow by: + // - First fetch list of files from instants of interest + // - Load FileStatus's + fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles) + val queryInstant = if (specifiedQueryInstant.isDefined) { + specifiedQueryInstant + } else if (latestInstant.isPresent) { + Some(latestInstant.get.getTimestamp) + } else { + None + } + + (tableType, queryType) match { + case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) => + // Fetch and store latest base and log files, and their sizes + cachedAllInputFileSlices = partitionFiles.map(p => { + val latestSlices = if (queryInstant.isDefined) { + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.path, queryInstant.get) + .iterator().asScala.toSeq + } else { + Seq() + } + (p._1, latestSlices) + }) + cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSlice => { + if (fileSlice.getBaseFile.isPresent) { + fileSlice.getBaseFile.get().getFileLen + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum + } else { + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum + } + }).sum + case (_, _) => + // Fetch and store latest base files and its sizes + cachedAllInputFileSlices = partitionFiles.map(p => { + val fileSlices = specifiedQueryInstant + .map(instant => + fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.path, instant, true)) + .getOrElse(fileSystemView.getLatestFileSlices(p._1.path)) + .iterator().asScala.toSeq + (p._1, fileSlices) + }) + cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSliceSize).sum + } + + // If the partition value contains InternalRow.empty, we query it as a non-partitioned table. + queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values.isEmpty) + val flushSpend = System.currentTimeMillis() - startTime + + logInfo(s"Refresh table ${metaClient.getTableConfig.getTableName}," + + s" spend: $flushSpend ms") + } + + protected def refresh(): Unit = { + fileStatusCache.invalidateAll() + refresh0() + } + + private def fileSliceSize(fileSlice: FileSlice): Long = { + val logFileSize = fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).filter(_ > 0).sum + if (fileSlice.getBaseFile.isPresent) { + fileSlice.getBaseFile.get().getFileLen + logFileSize + } else { + logFileSize + } + } + + /** + * Load all partition paths and it's files under the query table path. + */ + private def loadPartitionPathFiles(): Map[PartitionPath, Array[FileStatus]] = { + val partitionPaths = getAllQueryPartitionPaths + // List files in all of the partition path. + val pathToFetch = mutable.ArrayBuffer[PartitionPath]() + val cachePartitionToFiles = mutable.Map[PartitionPath, Array[FileStatus]]() + // Fetch from the FileStatusCache + partitionPaths.foreach { partitionPath => + fileStatusCache.getLeafFiles(partitionPath.fullPartitionPath(basePath)) match { + case Some(filesInPartition) => + cachePartitionToFiles.put(partitionPath, filesInPartition) + + case None => pathToFetch.append(partitionPath) + } + } + + val fetchedPartitionToFiles = + if (pathToFetch.nonEmpty) { + val fullPartitionPathsToFetch = pathToFetch.map(p => (p, p.fullPartitionPath(basePath).toString)).toMap + val partitionToFilesMap = FSUtils.getFilesInPartitions(engineContext, metadataConfig, basePath, + fullPartitionPathsToFetch.values.toArray, fileSystemStorageConfig.getSpillableDir) + fullPartitionPathsToFetch.map(p => { + (p._1, partitionToFilesMap.get(p._2)) + }) + } else { + Map.empty[PartitionPath, Array[FileStatus]] + } + + // Update the fileStatusCache + fetchedPartitionToFiles.foreach { + case (partitionRowPath, filesInPartition) => + fileStatusCache.putLeafFiles(partitionRowPath.fullPartitionPath(basePath), filesInPartition) + } + cachePartitionToFiles.toMap ++ fetchedPartitionToFiles + } + + def getAllQueryPartitionPaths: Seq[PartitionPath] = { val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath) // Load all the partition path from the basePath, and filter by the query partition path. // TODO load files from the queryPartitionPath directly. @@ -189,123 +303,10 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, partitionValues } } - PartitionRowPath(partitionRow, partitionPath) + PartitionPath(partitionPath, partitionRow) } } - protected def refresh(): Unit = { - fileStatusCache.invalidateAll() - refresh0() - } - - private def fileSliceSize(fileSlice: FileSlice): Long = { - val logFileSize = fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).filter(_ > 0).sum - if (fileSlice.getBaseFile.isPresent) { - fileSlice.getBaseFile.get().getFileLen + logFileSize - } else { - logFileSize - } - } - - /** - * Load all partition paths and it's files under the query table path. - */ - private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = { - val partitionRowPaths = getAllQueryPartitionPaths - // List files in all of the partition path. - val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]() - val cachePartitionToFiles = mutable.Map[PartitionRowPath, Array[FileStatus]]() - // Fetch from the FileStatusCache - partitionRowPaths.foreach { partitionRowPath => - fileStatusCache.getLeafFiles(partitionRowPath.fullPartitionPath(basePath)) match { - case Some(filesInPartition) => - cachePartitionToFiles.put(partitionRowPath, filesInPartition) - - case None => pathToFetch.append(partitionRowPath) - } - } - - val fetchedPartitionToFiles = - if (pathToFetch.nonEmpty) { - val fullPartitionPathsToFetch = pathToFetch.map(p => (p, p.fullPartitionPath(basePath).toString)).toMap - val partitionToFilesMap = FSUtils.getFilesInPartitions(engineContext, metadataConfig, basePath, - fullPartitionPathsToFetch.values.toArray, fileSystemStorageConfig.getSpillableDir) - fullPartitionPathsToFetch.map(p => { - (p._1, partitionToFilesMap.get(p._2)) - }) - } else { - Map.empty[PartitionRowPath, Array[FileStatus]] - } - - // Update the fileStatusCache - fetchedPartitionToFiles.foreach { - case (partitionRowPath, filesInPartition) => - fileStatusCache.putLeafFiles(partitionRowPath.fullPartitionPath(basePath), filesInPartition) - } - cachePartitionToFiles.toMap ++ fetchedPartitionToFiles - } - - private def refresh0(): Unit = { - val startTime = System.currentTimeMillis() - val partitionFiles = loadPartitionPathFiles() - val allFiles = partitionFiles.values.reduceOption(_ ++ _) - .getOrElse(Array.empty[FileStatus]) - - metaClient.reloadActiveTimeline() - val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants - val latestInstant = activeInstants.lastInstant() - // TODO we can optimize the flow by: - // - First fetch list of files from instants of interest - // - Load FileStatus's - fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles) - val queryInstant = if (specifiedQueryInstant.isDefined) { - specifiedQueryInstant - } else if (latestInstant.isPresent) { - Some(latestInstant.get.getTimestamp) - } else { - None - } - - (tableType, queryType) match { - case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) => - // Fetch and store latest base and log files, and their sizes - cachedAllInputFileSlices = partitionFiles.map(p => { - val latestSlices = if (queryInstant.isDefined) { - fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get) - .iterator().asScala.toSeq - } else { - Seq() - } - (p._1, latestSlices) - }) - cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSlice => { - if (fileSlice.getBaseFile.isPresent) { - fileSlice.getBaseFile.get().getFileLen + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum - } else { - fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum - } - }).sum - case (_, _) => - // Fetch and store latest base files and its sizes - cachedAllInputFileSlices = partitionFiles.map(p => { - val fileSlices = specifiedQueryInstant - .map(instant => - fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.partitionPath, instant, true)) - .getOrElse(fileSystemView.getLatestFileSlices(p._1.partitionPath)) - .iterator().asScala.toSeq - (p._1, fileSlices) - }) - cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSliceSize).sum - } - - // If the partition value contains InternalRow.empty, we query it as a non-partitioned table. - queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values.isEmpty) - val flushSpend = System.currentTimeMillis() - startTime - - logInfo(s"Refresh table ${metaClient.getTableConfig.getTableName}," + - s" spend: $flushSpend ms") - } - /** * Parses partitioning columns' values from the provided path to the partition * @param partitionPath partition's path to parse partitioning columns' values from @@ -317,27 +318,38 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, protected def logInfo(str: => String): Unit /** - * Represent a partition path. - * e.g. PartitionPath(Array("2021","02","01"), "2021/02/01")) + * Represents a partition as a tuple of + *
    + *
  • Actual partition path (relative to the table's base path)
  • + *
  • Values of the corresponding columns table is being partitioned by (partitioning columns)
  • + *
+ * + * E.g. PartitionPath("2021/02/01", Array("2021","02","01")) + * + * NOTE: Partitioning column values might have engine specific representation (for ex, + * {@code UTF8String} for Spark, etc) and are solely used in partition pruning in an very + * engine-specific ways + * + * @param values values of the corresponding partitioning columns + * @param path partition's path * - * @param values The partition values of this partition path. - * @param partitionPath The partition path string. + * TODO expose as a trait and make impls engine-specific (current impl is tailored for Spark) */ - case class PartitionRowPath(values: Array[Any], partitionPath: String) { + case class PartitionPath(path: String, values: Array[Any]) { override def equals(other: Any): Boolean = other match { - case PartitionRowPath(_, otherPath) => partitionPath == otherPath + case PartitionPath(otherPath, _) => path == otherPath case _ => false } override def hashCode(): Int = { - partitionPath.hashCode + path.hashCode } def fullPartitionPath(basePath: String): Path = { - if (partitionPath.isEmpty) { + if (path.isEmpty) { new Path(basePath) // This is a non-partition path } else { - new Path(basePath, partitionPath) + new Path(basePath, path) } } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 2e785ce86ec9f..1b42dba78e483 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -118,7 +118,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, // Prune the partition path by the partition filters val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters) prunedPartitions.map(partition => { - (partition.partitionPath, cachedAllInputFileSlices(partition)) + (partition.path, cachedAllInputFileSlices(partition)) }).toMap } @@ -130,8 +130,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession, * @param predicates The filter condition. * @return The Pruned partition paths. */ - def prunePartition(partitionPaths: Seq[PartitionRowPath], - predicates: Seq[Expression]): Seq[PartitionRowPath] = { + def prunePartition(partitionPaths: Seq[PartitionPath], + predicates: Seq[Expression]): Seq[PartitionPath] = { val partitionColumnNames = partitionSchema.fields.map(_.name).toSet val partitionPruningPredicates = predicates.filter { @@ -147,7 +147,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, }) val prunedPartitionPaths = partitionPaths.filter { - case PartitionRowPath(values, _) => boundPredicate.eval(InternalRow.fromSeq(values)) + case PartitionPath(_, values) => boundPredicate.eval(InternalRow.fromSeq(values)) } logInfo(s"Total partition size is: ${partitionPaths.size}," + s" after partition prune size is: ${prunedPartitionPaths.size}") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 62f98cf53617d..d4f7cede7f6d4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -273,7 +273,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { val fileIndex = HoodieFileIndex(spark, metaClient, None, queryOpts ++ Map(HoodieMetadataConfig.ENABLE.key -> useMetaFileList.toString)) // test if table is partitioned on nested columns, getAllQueryPartitionPaths does not break - assert(fileIndex.getAllQueryPartitionPaths.get(0).partitionPath.equals("c")) + assert(fileIndex.getAllQueryPartitionPaths.get(0).path.equals("c")) } private def attribute(partition: String): AttributeReference = { From a22bd6a541ae7361a29e920bc683163de2887045 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 5 Jan 2022 21:07:12 -0800 Subject: [PATCH 13/22] Abstracted `parsePartitionRow` to be defined in engine specific impls --- .../hudi/AbstractHoodieTableFileIndex.scala | 61 ++----------------- .../hudi/SparkHoodieTableFileIndex.scala | 56 ++++++++++++++++- 2 files changed, 61 insertions(+), 56 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala index b09fc2f4d21ee..26735dd066abb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala @@ -28,9 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} import org.apache.spark.sql.hudi.HoodieSqlUtils -import org.apache.spark.unsafe.types.UTF8String -import java.nio.charset.StandardCharsets import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.mutable @@ -251,67 +249,20 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, val partitionSchema = _partitionColumns - // Convert partition path to PartitionRowPath + // Convert partition's path into partition descriptor partitionPaths.map { partitionPath => - val partitionRow: Array[Any] = if (partitionSchema.length == 0) { - // This is a non-partitioned table - Array.empty - } else { - val partitionFragments = partitionPath.split("/") - - if (partitionFragments.length != partitionSchema.length && - partitionSchema.length == 1) { - // If the partition column size is not equal to the partition fragment size - // and the partition column size is 1, we map the whole partition path - // to the partition column which can benefit from the partition prune. - val prefix = s"${partitionSchema.head}=" - val partitionValue = if (partitionPath.startsWith(prefix)) { - // support hive style partition path - partitionPath.substring(prefix.length) - } else { - partitionPath - } - // TODO replace w/ byte array - Array(UTF8String.fromString(partitionValue)) - } else if (partitionFragments.length != partitionSchema.length && - partitionSchema.length > 1) { - // If the partition column size is not equal to the partition fragments size - // and the partition column size > 1, we do not know how to map the partition - // fragments to the partition columns. So we trait it as a Non-Partitioned Table - // for the query which do not benefit from the partition prune. - logWarning(s"Cannot do the partition prune for table $basePath." + - s"The partitionFragments size (${partitionFragments.mkString(",")})" + - s" is not equal to the partition columns size(${partitionSchema.mkString(",")})") - Array.empty - } else { // If partitionSeqs.length == partitionSchema.fields.length - - // Append partition name to the partition value if the - // HIVE_STYLE_PARTITIONING is disable. - // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" - val partitionWithName = - partitionFragments.zip(partitionSchema).map { - case (partition, columnName) => - if (partition.indexOf("=") == -1) { - s"${columnName}=$partition" - } else { - partition - } - }.mkString("/") - val pathWithPartitionName = new Path(basePath, partitionWithName) - val partitionValues = parsePartitionValuesFromPath(pathWithPartitionName).toArray - - partitionValues - } - } + val partitionRow = parsePartitionRow(partitionSchema, partitionPath) PartitionPath(partitionPath, partitionRow) } } /** - * Parses partitioning columns' values from the provided path to the partition + * Parses partition columns' values from the provided partition's path + * + * @param partitionColumns partitioning columns identifying the partition * @param partitionPath partition's path to parse partitioning columns' values from */ - protected def parsePartitionValuesFromPath(partitionPath: Path): Seq[Any] + protected def parsePartitionRow(partitionColumns: Array[String], partitionPath: String): Array[Any] // TODO eval whether we should just use logger directly protected def logWarning(str: => String): Unit diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 1b42dba78e483..b097072371afa 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, expressions} import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String // TODO unify w/ HoodieFileIndex class SparkHoodieTableFileIndex(spark: SparkSession, @@ -157,7 +158,60 @@ class SparkHoodieTableFileIndex(spark: SparkSession, } } - override protected def parsePartitionValuesFromPath(partitionPath: Path): Seq[Any] = { + protected def parsePartitionRow(partitionColumns: Array[String], partitionPath: String): Array[Any] = { + if (partitionColumns.length == 0) { + // This is a non-partitioned table + Array.empty + } else { + val partitionFragments = partitionPath.split("/") + + if (partitionFragments.length != partitionColumns.length && + partitionColumns.length == 1) { + // If the partition column size is not equal to the partition fragment size + // and the partition column size is 1, we map the whole partition path + // to the partition column which can benefit from the partition prune. + val prefix = s"${partitionColumns.head}=" + val partitionValue = if (partitionPath.startsWith(prefix)) { + // support hive style partition path + partitionPath.substring(prefix.length) + } else { + partitionPath + } + Array(UTF8String.fromString(partitionValue)) + } else if (partitionFragments.length != partitionColumns.length && + partitionColumns.length > 1) { + // If the partition column size is not equal to the partition fragments size + // and the partition column size > 1, we do not know how to map the partition + // fragments to the partition columns. So we trait it as a Non-Partitioned Table + // for the query which do not benefit from the partition prune. + logWarning(s"Cannot do the partition prune for table $basePath." + + s"The partitionFragments size (${partitionFragments.mkString(",")})" + + s" is not equal to the partition columns size(${partitionColumns.mkString(",")})") + Array.empty + } else { + // If partitionSeqs.length == partitionSchema.fields.length + // Append partition name to the partition value if the + // HIVE_STYLE_PARTITIONING is disable. + // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" + val partitionWithName = + partitionFragments.zip(partitionColumns).map { + case (partition, columnName) => + if (partition.indexOf("=") == -1) { + s"${columnName}=$partition" + } else { + partition + } + }.mkString("/") + + val pathWithPartitionName = new Path(basePath, partitionWithName) + val partitionValues = parsePartitionColumnValues(pathWithPartitionName, partitionSchema) + + partitionValues.toArray + } + } + } + + private def parsePartitionColumnValues(partitionPath: Path, partitionSchema: StructType): Seq[Any] = { val timeZoneId = Option.apply(configProperties.getString(DateTimeUtils.TIMEZONE_OPTION)) .getOrElse(SQLConf.get.sessionLocalTimeZone) val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap From 425853ffe25a3710fd9f5fd0f2a177bb0ff8f71c Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 5 Jan 2022 21:11:30 -0800 Subject: [PATCH 14/22] Tidying up --- .../org/apache/hudi/AbstractHoodieTableFileIndex.scala | 9 +++++---- .../org/apache/hudi/SparkHoodieTableFileIndex.scala | 6 +++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala index 26735dd066abb..aa64be78728cf 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala @@ -251,18 +251,19 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, // Convert partition's path into partition descriptor partitionPaths.map { partitionPath => - val partitionRow = parsePartitionRow(partitionSchema, partitionPath) - PartitionPath(partitionPath, partitionRow) + val partitionColumnValues = parsePartitionColumnValues(partitionSchema, partitionPath) + PartitionPath(partitionPath, partitionColumnValues) } } /** - * Parses partition columns' values from the provided partition's path + * Parses partition columns' values from the provided partition's path, returning list of + * values (that might have engine-specific representation) * * @param partitionColumns partitioning columns identifying the partition * @param partitionPath partition's path to parse partitioning columns' values from */ - protected def parsePartitionRow(partitionColumns: Array[String], partitionPath: String): Array[Any] + protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Any] // TODO eval whether we should just use logger directly protected def logWarning(str: => String): Unit diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index b097072371afa..aff6c26d10a06 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -158,7 +158,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, } } - protected def parsePartitionRow(partitionColumns: Array[String], partitionPath: String): Array[Any] = { + protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Any] = { if (partitionColumns.length == 0) { // This is a non-partitioned table Array.empty @@ -204,14 +204,14 @@ class SparkHoodieTableFileIndex(spark: SparkSession, }.mkString("/") val pathWithPartitionName = new Path(basePath, partitionWithName) - val partitionValues = parsePartitionColumnValues(pathWithPartitionName, partitionSchema) + val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema) partitionValues.toArray } } } - private def parsePartitionColumnValues(partitionPath: Path, partitionSchema: StructType): Seq[Any] = { + private def parsePartitionPath(partitionPath: Path, partitionSchema: StructType): Seq[Any] = { val timeZoneId = Option.apply(configProperties.getString(DateTimeUtils.TIMEZONE_OPTION)) .getOrElse(SQLConf.get.sessionLocalTimeZone) val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap From 21158d88df338868d37f1fde612f69137b29242e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 5 Jan 2022 21:19:55 -0800 Subject: [PATCH 15/22] Abstracted `specifiedQueryInstant` to be provided by engine-specific impl --- .../src/main/scala/org/apache/hudi/HoodieFileIndex.scala | 1 + .../org/apache/hudi/AbstractHoodieTableFileIndex.scala | 9 +++------ .../org/apache/hudi/SparkHoodieTableFileIndex.scala | 2 ++ 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 5dcd9712dadfd..80dfcc96e9538 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -66,6 +66,7 @@ case class HoodieFileIndex(spark: SparkSession, metaClient = metaClient, schemaSpec = schemaSpec, configProperties = getConfigProperties(spark, options), + specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlUtils.formatQueryInstant), fileStatusCache = fileStatusCache ) with FileIndex { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala index aa64be78728cf..ce3501e7cfdcc 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala @@ -50,14 +50,15 @@ import scala.collection.mutable * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt"). * * 3、Else the the partition columns size is not equal to the partition directory level and the - * size is great than "1" (e.g. partition column is "dt,hh", the partition path is "2021/03/10/12") - * , we read it as a Non-Partitioned table because we cannot know how to mapping the partition + * size is great than "1" (e.g. partition column is "dt,hh", the partition path is "2021/03/10/12"), + * we read it as a Non-Partitioned table because we cannot know how to mapping the partition * path with the partition columns in this case. * */ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, metaClient: HoodieTableMetaClient, configProperties: TypedProperties, + specifiedQueryInstant: Option[String] = None, @transient fileStatusCache: FileStatusCache = NoopCache) { /** * Get all completeCommits. @@ -81,10 +82,6 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, private val queryType = configProperties(QUERY_TYPE.key()) private val tableType = metaClient.getTableType - private val specifiedQueryInstant = - Option.apply(configProperties.getString(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)) - .map(HoodieSqlUtils.formatQueryInstant) - @transient private val queryPath = new Path(configProperties.getOrElse("path", "'path' option required")) @transient @volatile protected var cachedFileSize: Long = 0L diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index aff6c26d10a06..aa5b7aa2d49e4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -40,11 +40,13 @@ class SparkHoodieTableFileIndex(spark: SparkSession, metaClient: HoodieTableMetaClient, schemaSpec: Option[StructType], configProperties: TypedProperties, + specifiedQueryInstant: Option[String] = None, @transient fileStatusCache: FileStatusCache = NoopCache) extends AbstractHoodieTableFileIndex( engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), metaClient, configProperties, + specifiedQueryInstant, fileStatusCache ) with SparkAdapterSupport From ab0726c33120041e7c77553b5c862d5b9d1728a1 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 6 Jan 2022 16:28:36 -0800 Subject: [PATCH 16/22] Tidying up --- .../scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala index ce3501e7cfdcc..4bd3073718cc4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala @@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} -import org.apache.spark.sql.hudi.HoodieSqlUtils import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ From 9c076d7a5f1ba60240381bf0ba0712eee5cbab69 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 6 Jan 2022 16:40:24 -0800 Subject: [PATCH 17/22] Removed dep on Spark's `FileStatusCache` from `AbstractHoodieTableFileIndex` --- .../hudi/AbstractHoodieTableFileIndex.scala | 15 ++++++++++----- .../apache/hudi/SparkHoodieTableFileIndex.scala | 12 ++++++++++-- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala index 4bd3073718cc4..e0b302006ac4d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala @@ -26,7 +26,6 @@ import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} -import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -58,7 +57,7 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, metaClient: HoodieTableMetaClient, configProperties: TypedProperties, specifiedQueryInstant: Option[String] = None, - @transient fileStatusCache: FileStatusCache = NoopCache) { + @transient fileStatusCache: FileStatusCacheTrait) { /** * Get all completeCommits. */ @@ -185,7 +184,7 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, } protected def refresh(): Unit = { - fileStatusCache.invalidateAll() + fileStatusCache.invalidate() refresh0() } @@ -208,7 +207,7 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, val cachePartitionToFiles = mutable.Map[PartitionPath, Array[FileStatus]]() // Fetch from the FileStatusCache partitionPaths.foreach { partitionPath => - fileStatusCache.getLeafFiles(partitionPath.fullPartitionPath(basePath)) match { + fileStatusCache.get(partitionPath.fullPartitionPath(basePath)) match { case Some(filesInPartition) => cachePartitionToFiles.put(partitionPath, filesInPartition) @@ -231,7 +230,7 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, // Update the fileStatusCache fetchedPartitionToFiles.foreach { case (partitionRowPath, filesInPartition) => - fileStatusCache.putLeafFiles(partitionRowPath.fullPartitionPath(basePath), filesInPartition) + fileStatusCache.put(partitionRowPath.fullPartitionPath(basePath), filesInPartition) } cachePartitionToFiles.toMap ++ fetchedPartitionToFiles } @@ -302,3 +301,9 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, } } } + +trait FileStatusCacheTrait { + def get(path: Path): Option[Array[FileStatus]] + def put(path: Path, leafFiles: Array[FileStatus]): Unit + def invalidate(): Unit +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index aa5b7aa2d49e4..e627080e1ff31 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -17,7 +17,7 @@ package org.apache.hudi -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hudi.SparkHoodieTableFileIndex.generateFieldMap import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.TypedProperties @@ -47,7 +47,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, metaClient, configProperties, specifiedQueryInstant, - fileStatusCache + SparkHoodieTableFileIndex.adapt(fileStatusCache) ) with SparkAdapterSupport with Logging { @@ -269,4 +269,12 @@ object SparkHoodieTableFileIndex { traverse(Right(structType)) } + + private def adapt(cache: FileStatusCache): FileStatusCacheTrait = { + new FileStatusCacheTrait { + override def get(path: Path): Option[Array[FileStatus]] = cache.getLeafFiles(path) + override def put(path: Path, leafFiles: Array[FileStatus]): Unit = cache.putLeafFiles(path, leafFiles) + override def invalidate(): Unit = cache.invalidateAll() + } + } } \ No newline at end of file From 27d50aea947dadd08d5db6a44fc75f60d695faa2 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 6 Jan 2022 20:35:07 -0800 Subject: [PATCH 18/22] `lint` --- .../main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index e627080e1ff31..fc06f57659db5 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -277,4 +277,4 @@ object SparkHoodieTableFileIndex { override def invalidate(): Unit = cache.invalidateAll() } } -} \ No newline at end of file +} From ac704919f2878cb56d5363aae8492a090a9c337c Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 7 Jan 2022 12:05:28 -0800 Subject: [PATCH 19/22] Fixed tests --- .../main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index fc06f57659db5..bfcbb1dbb8c82 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -214,8 +214,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, } private def parsePartitionPath(partitionPath: Path, partitionSchema: StructType): Seq[Any] = { - val timeZoneId = Option.apply(configProperties.getString(DateTimeUtils.TIMEZONE_OPTION)) - .getOrElse(SQLConf.get.sessionLocalTimeZone) + val timeZoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone) val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap sparkParsePartitionUtil.parsePartition( From 9cde9d8e79e8bba1510afc84942dbc06e34930d9 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 14 Jan 2022 10:20:56 -0800 Subject: [PATCH 20/22] Tidying up --- .../scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala index e0b302006ac4d..9138b60619d5c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala @@ -63,9 +63,7 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, */ lazy val completedCommits = metaClient.getCommitsTimeline .filterCompletedInstants().getInstants.iterator().toList.map(_.getTimestamp) - /** - * Get the partition schema from the hoodie.properties. - */ + private lazy val _partitionColumns: Array[String] = metaClient.getTableConfig.getPartitionFields.orElse(Array[String]()) From d3e8360db0aba8353e8666574dee0762b565186b Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 13 Jan 2022 19:48:24 -0800 Subject: [PATCH 21/22] Tidying up java-docs # Conflicts: # hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java --- .../org/apache/hudi/HoodieFileIndex.scala | 17 ++++++- .../hudi/AbstractHoodieTableFileIndex.scala | 48 ++++++------------- .../hudi/SparkHoodieTableFileIndex.scala | 11 ++++- 3 files changed, 41 insertions(+), 35 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 80dfcc96e9538..a38d0c5c7202f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -54,8 +54,8 @@ import scala.util.{Failure, Success, Try} * , we read it as a Non-Partitioned table because we cannot know how to mapping the partition * path with the partition columns in this case. * + * TODO rename to HoodieSparkSqlFileIndex */ -// TODO rename to HoodieSparkSqlFileIndex case class HoodieFileIndex(spark: SparkSession, metaClient: HoodieTableMetaClient, schemaSpec: Option[StructType], @@ -80,6 +80,21 @@ case class HoodieFileIndex(spark: SparkSession, spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean } + /** + * Returns the FileStatus for all the base files (excluding log files). This should be used only for + * cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic + * implemented internally within Hudi like HoodieBootstrapRelation. This helps avoid the use of path filter + * to filter out log files within Spark. + * + * @return List of FileStatus for base files + */ + def allFiles: Seq[FileStatus] = { + cachedAllInputFileSlices.values.flatten + .filter(_.getBaseFile.isPresent) + .map(_.getBaseFile.get().getFileStatus) + .toSeq + } + /** * Invoked by Spark to fetch list of latest base files per partition. * diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala index 9138b60619d5c..462d88e0297d2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala @@ -32,26 +32,23 @@ import scala.collection.JavaConverters._ import scala.collection.mutable /** - * A file index which support partition prune for hoodie snapshot and read-optimized query. + * Common (engine-agnostic) File Index implementation enabling individual query engines to + * list Hudi Table contents based on the * - * Main steps to get the file list for query: - * 1、Load all files and partition values from the table path. - * 2、Do the partition prune by the partition filter condition. - * - * There are 3 cases for this: - * 1、If the partition columns size is equal to the actually partition path level, we - * read it as partitioned table.(e.g partition column is "dt", the partition path is "2021-03-10") - * - * 2、If the partition columns size is not equal to the partition path level, but the partition - * column size is "1" (e.g. partition column is "dt", but the partition path is "2021/03/10" - * who's directory level is 3).We can still read it as a partitioned table. We will mapping the - * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt"). - * - * 3、Else the the partition columns size is not equal to the partition directory level and the - * size is great than "1" (e.g. partition column is "dt,hh", the partition path is "2021/03/10/12"), - * we read it as a Non-Partitioned table because we cannot know how to mapping the partition - * path with the partition columns in this case. + *
    + *
  • Table type (MOR, COW)
  • + *
  • Query type (snapshot, read_optimized, incremental)
  • + *
  • Query instant/range
  • + *
* + * @param engineContext Hudi engine-specific context + * @param metaClient Hudi table's meta-client + * @param configProperties unifying configuration (in the form of generic properties) + * @param queryType target query type + * @param queryPaths target DFS paths being queried + * @param specifiedQueryInstant instant as of which table is being queried + * @param shouldIncludePendingCommits flags whether file-index should exclude any pending operations + * @param fileStatusCache transient cache of fetched [[FileStatus]]es */ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, metaClient: HoodieTableMetaClient, @@ -105,21 +102,6 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, } } - /** - * Returns the FileStatus for all the base files (excluding log files). This should be used only for - * cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic - * implemented internally within Hudi like HoodieBootstrapRelation. This helps avoid the use of path filter - * to filter out log files within Spark. - * - * @return List of FileStatus for base files - */ - def allFiles: Seq[FileStatus] = { - cachedAllInputFileSlices.values.flatten - .filter(_.getBaseFile.isPresent) - .map(_.getBaseFile.get().getFileStatus) - .toSeq - } - private def refresh0(): Unit = { val startTime = System.currentTimeMillis() val partitionFiles = loadPartitionPathFiles() diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index bfcbb1dbb8c82..e4f7b82e9206e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -35,7 +35,16 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String -// TODO unify w/ HoodieFileIndex +/** + * Implementation of the [[AbstractHoodieTableFileIndex]] for Spark + * + * @param spark spark session + * @param metaClient Hudi table's meta-client + * @param schemaSpec optional table's schema + * @param configProperties unifying configuration (in the form of generic properties) + * @param specifiedQueryInstant instant as of which table is being queried + * @param fileStatusCache transient cache of fetched [[FileStatus]]es + */ class SparkHoodieTableFileIndex(spark: SparkSession, metaClient: HoodieTableMetaClient, schemaSpec: Option[StructType], From f902af42a7d8ca8cb60c34e71b4a95ba39eb4e72 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 14 Jan 2022 10:37:03 -0800 Subject: [PATCH 22/22] Moving `AbstractHoodieTableFileIndex`, `SparkHoodieTableFileIndex` to "hudi-spark-common" --- .../scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala | 0 .../src/main/scala/org/apache/hudi/HoodieFileIndex.scala | 2 +- .../main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala | 0 3 files changed, 1 insertion(+), 1 deletion(-) rename hudi-spark-datasource/{hudi-spark => hudi-spark-common}/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala (100%) rename hudi-spark-datasource/{hudi-spark => hudi-spark-common}/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala (100%) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index a38d0c5c7202f..d7aa94f485adf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -66,7 +66,7 @@ case class HoodieFileIndex(spark: SparkSession, metaClient = metaClient, schemaSpec = schemaSpec, configProperties = getConfigProperties(spark, options), - specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlUtils.formatQueryInstant), + specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant), fileStatusCache = fileStatusCache ) with FileIndex { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala