diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala index 8a609d7d532b4..cdd3bfd8179b4 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala @@ -18,9 +18,12 @@ package org.apache.spark.sql import org.apache.hudi.SparkAdapterSupport.sparkAdapter +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Like, Literal, SubqueryExpression, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateStruct, Expression, GetStructField, Like, Literal, Projection, SubqueryExpression, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} @@ -92,11 +95,16 @@ object HoodieCatalystExpressionUtils { * B is a subset of A */ def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = { - val attrs = from.toAttributes - val attrsMap = attrs.map(attr => (attr.name, attr)).toMap - val targetExprs = to.fields.map(f => attrsMap(f.name)) - - UnsafeProjection.create(targetExprs, attrs) + val projection = generateUnsafeProjectionInternal(from, to) + val identical = from == to + // NOTE: Have to use explicit [[Projection]] instantiation to stay compatible w/ Scala 2.11 + new UnsafeProjection { + override def apply(row: InternalRow): UnsafeRow = + row match { + case ur: UnsafeRow if identical => ur + case _ => projection(row) + } + } } /** @@ -248,6 +256,14 @@ object HoodieCatalystExpressionUtils { ) } + private def generateUnsafeProjectionInternal(from: StructType, to: StructType): UnsafeProjection = { + val attrs = from.toAttributes + val attrsMap = attrs.map(attr => (attr.name, attr)).toMap + val targetExprs = to.fields.map(f => attrsMap(f.name)) + + UnsafeProjection.create(targetExprs, attrs) + } + private def hasUnresolvedRefs(resolvedExpr: Expression): Boolean = resolvedExpr.collectFirst { case _: UnresolvedAttribute | _: UnresolvedFunction => true diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala new file mode 100644 index 0000000000000..a99a0213c5698 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hudi + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema + +abstract class AvroProjection extends (GenericRecord => GenericRecord) + +object AvroProjection { + + /** + * Creates projection into provided [[Schema]] allowing to convert [[GenericRecord]] into + * new schema + */ + def create(schema: Schema): AvroProjection = { + val projection = (record: GenericRecord) => rewriteRecordWithNewSchema(record, schema) + // NOTE: Have to use explicit [[Projection]] instantiation to stay compatible w/ Scala 2.11 + new AvroProjection { + override def apply(record: GenericRecord): GenericRecord = + if (record.getSchema == schema) { + record + } else { + projection(record) + } + } + } + +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 42e71e5e33241..1ab2a23cd04bc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -36,26 +36,29 @@ import org.apache.spark.sql.types.StructType * [[BaseRelation]] implementation only reading Base files of Hudi tables, essentially supporting following querying * modes: * * - * NOTE: The reason this Relation is used in liue of Spark's default [[HadoopFsRelation]] is primarily due to the + * NOTE: The reason this Relation is used in-liue of Spark's default [[HadoopFsRelation]] is primarily due to the * fact that it injects real partition's path as the value of the partition field, which Hudi ultimately persists * as part of the record payload. In some cases, however, partition path might not necessarily be equal to the * verbatim value of the partition path field (when custom [[KeyGenerator]] is used) therefore leading to incorrect * partition field values being written */ -class BaseFileOnlyRelation(sqlContext: SQLContext, - metaClient: HoodieTableMetaClient, - optParams: Map[String, String], - userSchema: Option[StructType], - globPaths: Seq[Path]) - extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport { +case class BaseFileOnlyRelation(override val sqlContext: SQLContext, + override val metaClient: HoodieTableMetaClient, + override val optParams: Map[String, String], + private val userSchema: Option[StructType], + private val globPaths: Seq[Path], + private val prunedDataSchema: Option[StructType] = None) + extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema) + with SparkAdapterSupport { case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit override type FileSplit = HoodieBaseFileSplit + override type Relation = BaseFileOnlyRelation // TODO(HUDI-3204) this is to override behavior (exclusively) for COW tables to always extract // partition values from partition path @@ -67,6 +70,9 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, override lazy val mandatoryFields: Seq[String] = Seq.empty + override def updatePrunedDataSchema(prunedSchema: StructType): Relation = + this.copy(prunedDataSchema = Some(prunedSchema)) + override def imbueConfigs(sqlContext: SQLContext): Unit = { super.imbueConfigs(sqlContext) sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true") @@ -106,18 +112,16 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, } protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = { - val partitions = listLatestBaseFiles(globPaths, partitionFilters, dataFilters) - val fileSplits = partitions.values.toSeq - .flatMap { files => - files.flatMap { file => - // TODO fix, currently assuming parquet as underlying format - HoodieDataSourceHelper.splitFiles( - sparkSession = sparkSession, - file = file, - partitionValues = getPartitionColumnsAsInternalRow(file) - ) - } - } + val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters) + val fileSplits = fileSlices.flatMap { fileSlice => + // TODO fix, currently assuming parquet as underlying format + val fs = fileSlice.getBaseFile.get.getFileStatus + HoodieDataSourceHelper.splitFiles( + sparkSession = sparkSession, + file = fs, + partitionValues = getPartitionColumnsAsInternalRow(fs) + ) + } // NOTE: It's important to order the splits in the reverse order of their // size so that we can subsequently bucket them in an efficient manner .sortBy(_.length)(implicitly[Ordering[Long]].reverse) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 41e4f8cf7559b..c74a023d3d8aa 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -244,10 +244,10 @@ object DefaultSource { new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) => - new MergeOnReadSnapshotRelation(sqlContext, parameters, userSchema, globPaths, metaClient) + new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema) case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => - new MergeOnReadIncrementalRelation(sqlContext, parameters, userSchema, metaClient) + new MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema) case (_, _, true) => new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 52df84b17f8fb..bf3d38b808d32 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -19,12 +19,10 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hbase.io.hfile.CacheConfig import org.apache.hadoop.mapred.JobConf - import org.apache.hudi.HoodieBaseRelation._ import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.HoodieAvroUtils @@ -33,36 +31,35 @@ import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig, Seri import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord} -import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.util.{ConfigUtils, StringUtils} import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.common.util.{ConfigUtils, StringUtils} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} import org.apache.hudi.io.storage.HoodieAvroHFileReader - import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.HoodieCatalystExpressionUtils.convertToCatalystExpression +import org.apache.spark.sql.HoodieCatalystExpressionUtils.{convertToCatalystExpression, generateUnsafeProjection} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} import org.apache.spark.sql.execution.FileRelation +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat} -import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SQLContext, SparkSession} +import org.apache.spark.sql.{Row, SQLContext, SparkSession} import org.apache.spark.unsafe.types.UTF8String + import java.net.URI import java.util.Locale - import scala.collection.JavaConverters._ import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} @@ -72,7 +69,7 @@ trait HoodieFileSplit {} case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None) case class HoodieTableState(tablePath: String, - latestCommitTimestamp: String, + latestCommitTimestamp: Option[String], recordKeyField: String, preCombineFieldOpt: Option[String], usesVirtualKeys: Boolean, @@ -82,12 +79,13 @@ case class HoodieTableState(tablePath: String, recordMergerStrategy: String) /** - * Hoodie BaseRelation which extends [[PrunedFilteredScan]]. + * Hoodie BaseRelation which extends [[PrunedFilteredScan]] */ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val metaClient: HoodieTableMetaClient, val optParams: Map[String, String], - schemaSpec: Option[StructType]) + private val schemaSpec: Option[StructType], + private val prunedDataSchema: Option[StructType]) extends BaseRelation with FileRelation with PrunedFilteredScan @@ -95,6 +93,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, with SparkAdapterSupport { type FileSplit <: HoodieFileSplit + type Relation <: HoodieBaseRelation imbueConfigs(sqlContext) @@ -105,7 +104,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig - protected lazy val basePath: String = metaClient.getBasePath + protected lazy val basePath: Path = metaClient.getBasePathV2 // NOTE: Record key-field is assumed singular here due to the either of // - In case Hudi's meta fields are enabled: record key will be pre-materialized (stored) as part @@ -176,14 +175,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) - protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) - - /** - * Data schema optimized (externally) by Spark's Optimizer. - * - * Please check scala-doc for [[updatePrunedDataSchema]] more details - */ - protected var optimizerPrunedDataSchema: Option[StructType] = None + protected lazy val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) /** * Controls whether partition values (ie values of partition columns) should be @@ -220,7 +212,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * NOTE: This fields are accessed by [[NestedSchemaPruning]] component which is only enabled for * Spark >= 3.1 */ - lazy val (fileFormat: FileFormat, fileFormatClassName: String) = + protected lazy val (fileFormat: FileFormat, fileFormatClassName: String) = metaClient.getTableConfig.getBaseFileFormat match { case HoodieFileFormat.ORC => (new OrcFileFormat, "orc") case HoodieFileFormat.PARQUET => @@ -241,6 +233,25 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, HoodieFileIndex(sparkSession, metaClient, Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sparkSession)) + lazy val tableState: HoodieTableState = { + val recordMergerImpls = ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList + val recordMergerStrategy = getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY, + Option(metaClient.getTableConfig.getRecordMergerStrategy)) + + // Subset of the state of table's configuration as of at the time of the query + HoodieTableState( + tablePath = basePath.toString, + latestCommitTimestamp = queryTimestamp, + recordKeyField = recordKeyField, + preCombineFieldOpt = preCombineFieldOpt, + usesVirtualKeys = !tableConfig.populateMetaFields(), + recordPayloadClassName = tableConfig.getPayloadClass, + metadataConfig = fileIndex.metadataConfig, + recordMergerImpls = recordMergerImpls, + recordMergerStrategy = recordMergerStrategy + ) + } + /** * Columns that relation has to read from the storage to properly execute on its semantic: for ex, * for Merge-on-Read tables key fields as well and pre-combine field comprise mandatory set of columns, @@ -255,11 +266,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // NOTE: We're including compaction here since it's not considering a "commit" operation metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants - protected def latestInstant: Option[HoodieInstant] = - toScalaOption(timeline.lastInstant()) - - protected def queryTimestamp: Option[String] = - specifiedQueryTimestamp.orElse(latestInstant.map(_.getTimestamp)) + private def queryTimestamp: Option[String] = + specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp)) /** * Returns true in case table supports Schema on Read (Schema Evolution) @@ -285,8 +293,9 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ def canPruneRelationSchema: Boolean = (fileFormat.isInstanceOf[ParquetFileFormat] || fileFormat.isInstanceOf[OrcFileFormat]) && - // NOTE: Some relations might be disabling sophisticated schema pruning techniques (for ex, nested schema pruning) - // TODO(HUDI-XXX) internal schema doesn't support nested schema pruning currently + // NOTE: In case this relation has already been pruned there's no point in pruning it again + prunedDataSchema.isEmpty && + // TODO(HUDI-5421) internal schema doesn't support nested schema pruning currently !hasSchemaOnRead override def sizeInBytes: Long = fileIndex.sizeInBytes @@ -295,7 +304,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // NOTE: Optimizer could prune the schema (applying for ex, [[NestedSchemaPruning]] rule) setting new updated // schema in-place (via [[setPrunedDataSchema]] method), therefore we have to make sure that we pick // pruned data schema (if present) over the standard table's one - optimizerPrunedDataSchema.getOrElse(tableStructSchema) + prunedDataSchema.getOrElse(tableStructSchema) } /** @@ -316,25 +325,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * NOTE: DO NOT OVERRIDE THIS METHOD */ override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { - // NOTE: PLEAS READ CAREFULLY BEFORE MAKING CHANGES - // - // In case list of requested columns doesn't contain the Primary Key one, we - // have to add it explicitly so that - // - Merging could be performed correctly - // - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]], - // Spark still fetches all the rows to execute the query correctly - // + // NOTE: PLEASE READ CAREFULLY BEFORE MAKING CHANGES // *Appending* additional columns to the ones requested by the caller is not a problem, as those - // will be "projected out" by the caller's projection; - // - // (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM - // PROJECTION + // will be eliminated by the caller's projection; + // (!) Please note, however, that it's critical to avoid _reordering_ of the requested columns as this + // will break the upstream projection val targetColumns: Array[String] = appendMandatoryColumns(requiredColumns) // NOTE: We explicitly fallback to default table's Avro schema to make sure we avoid unnecessary Catalyst > Avro // schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and // could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions // w/ more than 2 types are involved) - val sourceSchema = optimizerPrunedDataSchema.map(convertToAvroSchema).getOrElse(tableAvroSchema) + val sourceSchema = prunedDataSchema.map(convertToAvroSchema).getOrElse(tableAvroSchema) val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) = projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), targetColumns) @@ -385,50 +386,28 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit] - /** - * Get all PartitionDirectories based on globPaths if specified, otherwise use the table path. - * Will perform pruning if necessary - */ - private def listPartitionDirectories(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - if (globPaths.isEmpty) { - fileIndex.listFiles(partitionFilters, dataFilters) - } else { - val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths) - inMemoryFileIndex.listFiles(partitionFilters, dataFilters) - } - } - - /** - * Get all latest base files with partition paths, if globPaths is empty, will listing files - * under the table path. - */ - protected def listLatestBaseFiles(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = { - val partitionDirs = listPartitionDirectories(globPaths, partitionFilters, dataFilters) - val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) + protected def listLatestFileSlices(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = { + queryTimestamp match { + case Some(ts) => + val partitionDirs = if (globPaths.isEmpty) { + fileIndex.listFiles(partitionFilters, dataFilters) + } else { + val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths) + inMemoryFileIndex.listFiles(partitionFilters, dataFilters) + } - val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus) + val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) - latestBaseFiles.groupBy(getPartitionPath) - } + fsView.getPartitionPaths.asScala.flatMap { partitionPath => + val relativePath = getRelativePartitionPath(basePath, partitionPath) + fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, ts).iterator().asScala.toSeq + } - /** - * Get all fileSlices(contains base files and log files if exist) from globPaths if not empty, - * otherwise will use the table path to do the listing. - */ - protected def listLatestFileSlices(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = { - latestInstant.map { _ => - val partitionDirs = listPartitionDirectories(globPaths, partitionFilters, dataFilters) - val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) - - val queryTimestamp = this.queryTimestamp.get - fsView.getPartitionPaths.asScala.flatMap { partitionPath => - val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath) - fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq - } - }.getOrElse(Seq()) + case _ => Seq() + } } - protected def convertToExpressions(filters: Array[Filter]): Array[Expression] = { + private def convertToExpressions(filters: Array[Filter]): Array[Expression] = { val catalystExpressions = filters.map(expr => convertToCatalystExpression(expr, tableStructSchema)) val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) => opt.isEmpty } @@ -444,7 +423,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * Checks whether given expression only references partition columns * (and involves no sub-query) */ - protected def isPartitionPredicate(condition: Expression): Boolean = { + private def isPartitionPredicate(condition: Expression): Boolean = { // Validates that the provided names both resolve to the same entity val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver @@ -452,7 +431,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, !SubqueryExpression.hasSubquery(condition) } - protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { + private final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { // For a nested field in mandatory columns, we should first get the root-level field, and then // check for any missing column, as the requestedColumns should only contain root-level fields // We should only append root-level field as well @@ -461,26 +440,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, requestedColumns ++ missing } - protected def getTableState: HoodieTableState = { - val recordMergerImpls = ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList - - val recordMergerStrategy = getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY, - Option(metaClient.getTableConfig.getRecordMergerStrategy)) - - // Subset of the state of table's configuration as of at the time of the query - HoodieTableState( - tablePath = basePath, - latestCommitTimestamp = queryTimestamp.get, - recordKeyField = recordKeyField, - preCombineFieldOpt = preCombineFieldOpt, - usesVirtualKeys = !tableConfig.populateMetaFields(), - recordPayloadClassName = tableConfig.getPayloadClass, - metadataConfig = fileIndex.metadataConfig, - recordMergerImpls = recordMergerImpls, - recordMergerStrategy = recordMergerStrategy - ) - } - def imbueConfigs(sqlContext: SQLContext): Unit = { sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") @@ -520,14 +479,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, } } - protected def getColName(f: StructField): String = { - if (sparkSession.sessionState.conf.caseSensitiveAnalysis) { - f.name - } else { - f.name.toLowerCase(Locale.ROOT) - } - } - /** * Hook for Spark's Optimizer to update expected relation schema after pruning * @@ -536,10 +487,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * Therefore more advanced optimizations (like [[NestedSchemaPruning]]) have to be carried out * by Spark's Optimizer holistically evaluating Spark's [[LogicalPlan]] */ - def updatePrunedDataSchema(prunedSchema: StructType): this.type = { - optimizerPrunedDataSchema = Some(prunedSchema) - this - } + def updatePrunedDataSchema(prunedSchema: StructType): Relation /** * Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]] @@ -664,9 +612,6 @@ object HoodieBaseRelation extends SparkAdapterSupport { def apply(file: PartitionedFile): Iterator[InternalRow] = read.apply(file) } - def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = - HoodieCatalystExpressionUtils.generateUnsafeProjection(from, to) - def convertToAvroSchema(structSchema: StructType): Schema = sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index ad587ab4445b6..d7b60db49291c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -20,16 +20,15 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf - import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader} -import org.apache.hudi.LogFileIterator.CONFIG_INSTANTIATION_LOCK -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload +import org.apache.hudi.HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK +import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} + import java.io.Closeable case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition @@ -43,7 +42,7 @@ case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSp * the base file and the corresponding delta-log file to merge them correctly * *
  • Required-schema reader: is used when it's fine to only read row's projected columns. - * This could occur, when row could be merged with corresponding delta-log record leveraging while only having + * This could occur, when row could be merged with corresponding delta-log record while leveraging only * projected columns
  • * *
  • Required-schema reader (skip-merging): is used when when no merging will be performed (skip-merged). @@ -78,35 +77,36 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) - private val confBroadcast = sc.broadcast(new SerializableWritable(config)) - - private val whitelistedPayloadClasses: Set[String] = Seq( - classOf[OverwriteWithLatestAvroPayload] - ).map(_.getName).toSet + private val hadoopConfBroadcast = sc.broadcast(new SerializableWritable(config)) override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition] - val iter = mergeOnReadPartition.split match { + val partition = split.asInstanceOf[HoodieMergeOnReadPartition] + val iter = partition.split match { case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema) projectedReader(dataFileOnlySplit.dataFile.get) case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => - new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, tableState, getConfig) + new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, tableState, getHadoopConf) - case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => - val reader = fileReaders.requiredSchemaReaderSkipMerging - new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, getConfig) + case split => + mergeType match { + case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL => + val reader = fileReaders.requiredSchemaReaderSkipMerging + new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, getHadoopConf) - case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => - val reader = pickBaseFileReader - new RecordMergingFileIterator(split, reader, tableSchema, requiredSchema, tableState, getConfig) + case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL => + val reader = pickBaseFileReader() + new RecordMergingFileIterator(split, reader, tableSchema, requiredSchema, tableState, getHadoopConf) + + case _ => throw new UnsupportedOperationException(s"Not supported merge type ($mergeType)") + } case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " + - s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" + - s"log paths: ${mergeOnReadPartition.split.logFiles.toString}" + + s"file path: ${partition.split.dataFile.get.filePath}" + + s"log paths: ${partition.split.logFiles.toString}" + s"hoodie table path: ${tableState.tablePath}" + - s"spark partition Index: ${mergeOnReadPartition.index}" + + s"spark partition Index: ${partition.index}" + s"merge type: ${mergeType}") } @@ -119,16 +119,15 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, iter } - private def pickBaseFileReader: BaseFileReader = { + private def pickBaseFileReader(): BaseFileReader = { // NOTE: This is an optimization making sure that even for MOR tables we fetch absolute minimum // of the stored data possible, while still properly executing corresponding relation's semantic // and meet the query's requirements. // - // Here we assume that iff queried table - // a) It does use one of the standard (and whitelisted) Record Payload classes - // then we can avoid reading and parsing the records w/ _full_ schema, and instead only - // rely on projected one, nevertheless being able to perform merging correctly - if (whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) { + // Here we assume that iff queried table does use one of the standard (and whitelisted) + // Record Payload classes then we can avoid reading and parsing the records w/ _full_ schema, + // and instead only rely on projected one, nevertheless being able to perform merging correctly + if (isProjectionCompatible(tableState)) { fileReaders.requiredSchemaReader } else { fileReaders.fullSchemaReader @@ -138,10 +137,16 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, override protected def getPartitions: Array[Partition] = fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray - private def getConfig: Configuration = { - val conf = confBroadcast.value.value + private def getHadoopConf: Configuration = { + val conf = hadoopConfBroadcast.value.value + // TODO clean up, this lock is unnecessary CONFIG_INSTANTIATION_LOCK.synchronized { new Configuration(conf) } } } + +object HoodieMergeOnReadRDD { + val CONFIG_INSTANTIATION_LOCK = new Object() +} + diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala index 20ef9ded7c4fe..410d8b5f27d2f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala @@ -19,11 +19,11 @@ package org.apache.hudi import org.apache.avro.Schema -import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} +import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf -import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection} +import org.apache.hudi.HoodieBaseRelation.BaseFileReader import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption} import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport import org.apache.hudi.LogFileIterator._ @@ -32,7 +32,7 @@ import org.apache.hudi.common.engine.{EngineType, HoodieLocalEngineContext} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType -import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieEmptyRecord, HoodieLogFile, HoodieRecord, HoodieSparkRecord} +import org.apache.hudi.common.model._ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner import org.apache.hudi.common.util.HoodieRecordUtils import org.apache.hudi.config.HoodiePayloadConfig @@ -42,9 +42,10 @@ import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata} import org.apache.hudi.util.CachingIterator -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieInternalRowUtils} +import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection +import org.apache.spark.sql.HoodieInternalRowUtils import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.catalyst.expressions.Projection import org.apache.spark.sql.types.StructType import java.io.Closeable @@ -64,7 +65,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit, config: Configuration) extends CachingIterator[InternalRow] with AvroDeserializerSupport { - protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) + private val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) protected val payloadProps: TypedProperties = tableState.preCombineFieldOpt .map { preCombineField => @@ -81,8 +82,8 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit, protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr) protected val logFileReaderStructType: StructType = tableSchema.structTypeSchema - protected val requiredSchemaSafeAvroProjection: SafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema) - protected val requiredSchemaUnsafeRowProjection: UnsafeProjection = HoodieCatalystExpressionUtils.generateUnsafeProjection(logFileReaderStructType, structTypeSchema) + private val requiredSchemaAvroProjection: AvroProjection = AvroProjection.create(avroSchema) + private val requiredSchemaRowProjection: Projection = generateUnsafeProjection(logFileReaderStructType, structTypeSchema) private val logRecords = { val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema) @@ -97,7 +98,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit, // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's // going to be used, since we modify `logRecords` before that and therefore can't do it any earlier - protected lazy val logRecordsIterator: Iterator[Option[HoodieRecord[_]]] = + private lazy val logRecordsIterator: Iterator[Option[HoodieRecord[_]]] = logRecords.iterator.map { case (_, record: HoodieSparkRecord) => Option(record) case (_, _: HoodieEmptyRecord[_]) => Option.empty @@ -117,11 +118,11 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit, logRecordsIterator.hasNext && { logRecordsIterator.next() match { case Some(r: HoodieAvroIndexedRecord) => - val projectedAvroRecord = requiredSchemaSafeAvroProjection(r.getData.asInstanceOf[GenericRecord]) + val projectedAvroRecord = requiredSchemaAvroProjection(r.getData.asInstanceOf[GenericRecord]) nextRecord = deserialize(projectedAvroRecord) true case Some(r: HoodieSparkRecord) => - nextRecord = requiredSchemaUnsafeRowProjection(r.getData) + nextRecord = requiredSchemaRowProjection(r.getData) true case None => this.hasNextInternal } @@ -142,14 +143,14 @@ private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit, config: Configuration) extends LogFileIterator(split, dataSchema, requiredSchema, tableState, config) { - private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, structTypeSchema) + private val requiredSchemaProjection = generateUnsafeProjection(baseFileReader.schema, structTypeSchema) private val baseFileIterator = baseFileReader(split.dataFile.get) override def doHasNext: Boolean = { if (baseFileIterator.hasNext) { // No merge is required, simply load current row and project into required schema - nextRecord = requiredSchemaUnsafeProjection(baseFileIterator.next()) + nextRecord = requiredSchemaProjection(baseFileIterator.next()) true } else { super[LogFileIterator].doHasNext @@ -174,20 +175,20 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, // - Full table's schema // - Projected schema // As such, no particular schema could be assumed, and therefore we rely on the caller - // to correspondingly set the scheme of the expected output of base-file reader + // to correspondingly set the schema of the expected output of base-file reader private val baseFileReaderAvroSchema = sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable = false, "record") private val serializer = sparkAdapter.createAvroSerializer(baseFileReader.schema, baseFileReaderAvroSchema, nullable = false) - private val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(avroSchema) - private val recordKeyOrdinal = baseFileReader.schema.fieldIndex(tableState.recordKeyField) - private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, structTypeSchema) + private val requiredSchemaProjection = generateUnsafeProjection(baseFileReader.schema, structTypeSchema) + private val requiredSchemaAvroProjection = AvroProjection.create(avroSchema) private val baseFileIterator = baseFileReader(split.dataFile.get) - private val recordMerger = HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, tableState.recordMergerImpls.asJava, tableState.recordMergerStrategy) + private val recordMerger = HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, + tableState.recordMergerImpls.asJava, tableState.recordMergerStrategy) override def doHasNext: Boolean = hasNextInternal @@ -201,7 +202,7 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, val updatedRecordOpt = removeLogRecord(curKey) if (updatedRecordOpt.isEmpty) { // No merge is required, simply load current row and project into required schema - nextRecord = requiredSchemaUnsafeProjection(curRow) + nextRecord = requiredSchemaProjection(curRow) true } else { val mergedRecordOpt = merge(curRow, updatedRecordOpt.get) @@ -229,24 +230,25 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, val curRecord = new HoodieSparkRecord(curRow, baseFileReader.schema) val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema, newRecord, logFileReaderAvroSchema, payloadProps) toScalaOption(result) - .map(r => { + .map { r => val schema = HoodieInternalRowUtils.getCachedSchema(r.getRight) val projection = HoodieInternalRowUtils.getCachedUnsafeProjection(schema, structTypeSchema) projection.apply(r.getLeft.getData.asInstanceOf[InternalRow]) - }) + } case _ => val curRecord = new HoodieAvroIndexedRecord(serialize(curRow)) val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema, newRecord, logFileReaderAvroSchema, payloadProps) toScalaOption(result) - .map(r => deserialize(projectAvroUnsafe(r.getLeft.toIndexedRecord(r.getRight, payloadProps).get.getData.asInstanceOf[GenericRecord], avroSchema, reusableRecordBuilder))) + .map { r => + val avroRecord = r.getLeft.toIndexedRecord(r.getRight, payloadProps).get.getData.asInstanceOf[GenericRecord] + deserialize(requiredSchemaAvroProjection(avroRecord)) + } } } } object LogFileIterator { - val CONFIG_INSTANTIATION_LOCK = new Object() - def scanLog(logFiles: List[HoodieLogFile], partitionPath: Path, logSchema: Schema, @@ -290,7 +292,9 @@ object LogFileIterator { .withBasePath(tablePath) .withLogFilePaths(logFiles.map(logFile => logFile.getPath.toString).asJava) .withReaderSchema(logSchema) - .withLatestInstantTime(tableState.latestCommitTimestamp) + // NOTE: This part shall only be reached when at least one log is present in the file-group + // entailing that table has to have at least one commit + .withLatestInstantTime(tableState.latestCommitTimestamp.get) .withReadBlocksLazily( Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean) @@ -334,12 +338,6 @@ object LogFileIterator { } } - def projectAvroUnsafe(record: GenericRecord, projectedSchema: Schema, reusableRecordBuilder: GenericRecordBuilder): GenericRecord = { - val fields = projectedSchema.getFields.asScala - fields.foreach(field => reusableRecordBuilder.set(field, record.get(field.name()))) - reusableRecordBuilder.build() - } - def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = { // Determine partition path as an immediate parent folder of either // - The base file diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index cefd319780535..93bf730a56d97 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -40,13 +40,18 @@ import scala.collection.immutable /** * @Experimental */ -class MergeOnReadIncrementalRelation(sqlContext: SQLContext, - optParams: Map[String, String], - userSchema: Option[StructType], - metaClient: HoodieTableMetaClient) - extends MergeOnReadSnapshotRelation(sqlContext, optParams, userSchema, Seq(), metaClient) with HoodieIncrementalRelationTrait { +case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext, + override val optParams: Map[String, String], + override val metaClient: HoodieTableMetaClient, + private val userSchema: Option[StructType], + private val prunedDataSchema: Option[StructType] = None) + extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient, Seq(), userSchema, prunedDataSchema) + with HoodieIncrementalRelationTrait { - override type FileSplit = HoodieMergeOnReadFileSplit + override type Relation = MergeOnReadIncrementalRelation + + override def updatePrunedDataSchema(prunedSchema: StructType): Relation = + this.copy(prunedDataSchema = Some(prunedSchema)) override def imbueConfigs(sqlContext: SQLContext): Unit = { super.imbueConfigs(sqlContext) @@ -72,7 +77,6 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, val optionalFilters = filters val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) - val hoodieTableState = getTableState // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately // filtered, since file-reader might not be capable to perform filtering new HoodieMergeOnReadRDD( @@ -81,7 +85,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, fileReaders = readers, tableSchema = tableSchema, requiredSchema = requiredSchema, - tableState = hoodieTableState, + tableState = tableState, mergeType = mergeType, fileSplits = fileSplits) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 1d0d533e5bb81..accfc8f2470bb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -20,15 +20,12 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema} +import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema import org.apache.hudi.HoodieConversionUtils.toScalaOption -import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath +import org.apache.hudi.MergeOnReadSnapshotRelation.{getFilePath, isProjectionCompatible} import org.apache.hudi.avro.HoodieAvroUtils -import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath -import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.table.view.HoodieTableFileSystemView -import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow @@ -42,12 +39,35 @@ import scala.collection.JavaConverters._ case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile], logFiles: List[HoodieLogFile]) extends HoodieFileSplit -class MergeOnReadSnapshotRelation(sqlContext: SQLContext, - optParams: Map[String, String], - userSchema: Option[StructType], - globPaths: Seq[Path], - metaClient: HoodieTableMetaClient) - extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) { +case class MergeOnReadSnapshotRelation(override val sqlContext: SQLContext, + override val optParams: Map[String, String], + override val metaClient: HoodieTableMetaClient, + private val globPaths: Seq[Path], + private val userSchema: Option[StructType], + private val prunedDataSchema: Option[StructType] = None) + extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient, globPaths, userSchema, prunedDataSchema) { + + override type Relation = MergeOnReadSnapshotRelation + + override def updatePrunedDataSchema(prunedSchema: StructType): MergeOnReadSnapshotRelation = + this.copy(prunedDataSchema = Some(prunedSchema)) + +} + +/** + * Base implementation of the Merge-on-Read snapshot relation + * + * NOTE: Reason this is extracted as a standalone base class is such that both MOR + * Snapshot and Incremental relations could inherit from it while both being Scala + * case classes + */ +abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, + optParams: Map[String, String], + metaClient: HoodieTableMetaClient, + globPaths: Seq[Path], + userSchema: Option[StructType], + prunedDataSchema: Option[StructType]) + extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema) { override type FileSplit = HoodieMergeOnReadFileSplit @@ -75,6 +95,12 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key, DataSourceReadOptions.REALTIME_MERGE.defaultValue) + /** + * Determines whether relation's schema could be pruned by Spark's Optimizer + */ + override def canPruneRelationSchema: Boolean = + super.canPruneRelationSchema && isProjectionCompatible(tableState) + override def imbueConfigs(sqlContext: SQLContext): Unit = { super.imbueConfigs(sqlContext) sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true") @@ -89,7 +115,6 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, val optionalFilters = filters val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) - val tableState = getTableState new HoodieMergeOnReadRDD( sqlContext.sparkContext, config = jobConf, @@ -160,10 +185,10 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, ) } else { val prunedRequiredSchema = { - val superfluousColumnNames = mandatoryColumns.filterNot(requestedColumns.contains) + val unusedMandatoryColumnNames = mandatoryColumns.filterNot(requestedColumns.contains) val prunedStructSchema = StructType(requiredDataSchema.structTypeSchema.fields - .filterNot(f => superfluousColumnNames.contains(f.name))) + .filterNot(f => unusedMandatoryColumnNames.contains(f.name))) HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema).toString) } @@ -220,6 +245,21 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, object MergeOnReadSnapshotRelation { + /** + * List of [[HoodieRecordPayload]] classes capable of merging projected records: + * in some cases, when for example, user is only interested in a handful of columns rather + * than the full row we will be able to optimize data throughput by only fetching the required + * columns. However, to properly fulfil MOR semantic particular [[HoodieRecordPayload]] in + * question should be able to merge records based on just such projected representation (including + * columns required for merging, such as primary-key, pre-combine key, etc) + */ + private val projectionCompatiblePayloadClasses: Set[String] = Seq( + classOf[OverwriteWithLatestAvroPayload] + ).map(_.getName).toSet + + def isProjectionCompatible(tableState: HoodieTableState): Boolean = + projectionCompatiblePayloadClasses.contains(tableState.recordPayloadClassName) + def getFilePath(path: Path): String = { // Here we use the Path#toUri to encode the path string, as there is a decode in // ParquetFileFormat#buildReaderWithPartitionValues in the spark project when read the table diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SafeAvroProjection.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SafeAvroProjection.scala deleted file mode 100644 index 90978069177f7..0000000000000 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SafeAvroProjection.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.hudi - -import org.apache.hudi.SafeAvroProjection.collectFieldOrdinals -import org.apache.hudi.common.util.ValidationUtils.checkState - -import org.apache.avro.Schema -import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} - -import scala.collection.JavaConverters._ - -// TODO extract to HoodieAvroSchemaUtils -abstract class AvroProjection extends (GenericRecord => GenericRecord) - -class SafeAvroProjection(sourceSchema: Schema, - projectedSchema: Schema, - reusableRecordBuilder: GenericRecordBuilder = null) extends AvroProjection { - - private val ordinals: List[Int] = collectFieldOrdinals(projectedSchema, sourceSchema) - private val recordBuilder: GenericRecordBuilder = - if (reusableRecordBuilder != null) { - reusableRecordBuilder - } else { - new GenericRecordBuilder(projectedSchema) - } - - override def apply(record: GenericRecord): GenericRecord = { - val fields = projectedSchema.getFields.asScala - checkState(fields.length == ordinals.length) - fields.zip(ordinals).foreach { - case (field, pos) => recordBuilder.set(field, record.get(pos)) - } - recordBuilder.build() - } -} - -object SafeAvroProjection { - def create(sourceSchema: Schema, projectedSchema: Schema, reusableRecordBuilder: GenericRecordBuilder = null): SafeAvroProjection = - new SafeAvroProjection( - sourceSchema = sourceSchema, - projectedSchema = projectedSchema, - reusableRecordBuilder = reusableRecordBuilder) - - /** - * Maps [[projected]] [[Schema]] onto [[source]] one, collecting corresponding field ordinals w/in it, which - * will be subsequently used by either [[projectRowUnsafe]] or [[projectAvroUnsafe()]] method - * - * @param projected target projected schema (which is a proper subset of [[source]] [[Schema]]) - * @param source source schema of the record being projected - * @return list of ordinals of corresponding fields of [[projected]] schema w/in [[source]] one - */ - private def collectFieldOrdinals(projected: Schema, source: Schema): List[Int] = { - projected.getFields.asScala.map(f => source.getField(f.name()).pos()).toList - } -} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala index 17a1cab27feb7..4768e4a3d8785 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala @@ -19,7 +19,7 @@ package org.apache.hudi.cdc import org.apache.hudi.HoodieBaseRelation.BaseFileReader -import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport} +import org.apache.hudi.{AvroConversionUtils, AvroProjection, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport} import org.apache.hudi.HoodieConversionUtils._ import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport import org.apache.hudi.avro.HoodieAvroUtils @@ -34,32 +34,28 @@ import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory - import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.module.scala.DefaultScalaModule - import org.apache.avro.Schema import org.apache.avro.generic.{GenericData, GenericRecord, GenericRecordBuilder, IndexedRecord} - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path - import org.apache.spark.{Partition, SerializableWritable, TaskContext} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.HoodieAvroDeserializer -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.catalyst.expressions.Projection import org.apache.spark.sql.types.StringType import org.apache.spark.unsafe.types.UTF8String import java.io.Closeable import java.util.Properties import java.util.stream.Collectors - import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.mutable @@ -154,7 +150,7 @@ class HoodieCDCRDD( .build() HoodieTableState( pathToString(basePath), - split.changes.last.getInstant, + Some(split.changes.last.getInstant), recordKeyField, preCombineFieldOpt, usesVirtualKeys = false, @@ -181,7 +177,7 @@ class HoodieCDCRDD( private lazy val serializer = sparkAdapter.createAvroSerializer(originTableSchema.structTypeSchema, avroSchema, nullable = false) - private lazy val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(avroSchema) + private lazy val avroProjection = AvroProjection.create(avroSchema) private lazy val cdcAvroSchema: Schema = HoodieCDCUtils.schemaBySupplementalLoggingMode( cdcSupplementalLoggingMode, @@ -197,7 +193,7 @@ class HoodieCDCRDD( sparkAdapter.createAvroDeserializer(cdcAvroSchema, cdcSparkSchema) } - private lazy val projection: UnsafeProjection = generateUnsafeProjection(cdcSchema, requiredCdcSchema) + private lazy val projection: Projection = generateUnsafeProjection(cdcSchema, requiredCdcSchema) // Iterator on cdc file private val cdcFileIter = split.changes.iterator @@ -381,7 +377,7 @@ class HoodieCDCRDD( val existingRecordOpt = beforeImageRecords.get(key) if (existingRecordOpt.isEmpty) { // a new record is inserted. - val insertedRecord = projectAvroUnsafe(indexedRecord.get) + val insertedRecord = avroProjection(indexedRecord.get.asInstanceOf[GenericRecord]) recordToLoad.update(0, CDCRelation.CDC_OPERATION_INSERT) recordToLoad.update(2, null) recordToLoad.update(3, recordToJsonAsUTF8String(insertedRecord)) @@ -392,7 +388,7 @@ class HoodieCDCRDD( // a existed record is updated. val existingRecord = existingRecordOpt.get val merged = merge(existingRecord, logRecord) - val mergeRecord = projectAvroUnsafe(merged) + val mergeRecord = avroProjection(merged.asInstanceOf[GenericRecord]) if (existingRecord != mergeRecord) { recordToLoad.update(0, CDCRelation.CDC_OPERATION_UPDATE) recordToLoad.update(2, recordToJsonAsUTF8String(existingRecord)) @@ -612,18 +608,10 @@ class HoodieCDCRDD( toScalaOption(record.toIndexedRecord(avroSchema, payloadProps)).map(_.getData) } - private def projectAvroUnsafe(record: IndexedRecord): GenericRecord = { - LogFileIterator.projectAvroUnsafe(record.asInstanceOf[GenericRecord], - avroSchema, reusableRecordBuilder) - } - private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_]): IndexedRecord = { newRecord.getData.asInstanceOf[HoodieRecordPayload[_]].combineAndGetUpdateValue(curAvroRecord, avroSchema, payloadProps).get() } - private def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = - HoodieCatalystExpressionUtils.generateUnsafeProjection(from, to) - override def close(): Unit = {} } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala index 0e39347ae92e8..d8f53a15dd600 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala @@ -149,7 +149,7 @@ class HoodieStreamSource( .map(serDe.serializeRow) case HoodieTableType.MERGE_ON_READ => val requiredColumns = schema.fields.map(_.name) - new MergeOnReadIncrementalRelation(sqlContext, incParams, Some(schema), metaClient) + new MergeOnReadIncrementalRelation(sqlContext, incParams, metaClient, Some(schema)) .buildScan(requiredColumns, Array.empty[Filter]) .asInstanceOf[RDD[InternalRow]] case _ => throw new IllegalArgumentException(s"UnSupport tableType: $tableType") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 9eda3e8be1001..21194eaaeeb47 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -116,8 +116,7 @@ object HoodieAnalysis { } val nestedSchemaPruningRule = instantiateKlass(nestedSchemaPruningClass) - // TODO(HUDI-5443) re-enable - //optimizerRules += (_ => nestedSchemaPruningRule) + optimizerRules += (_ => nestedSchemaPruningRule) } // NOTE: [[HoodiePruneFileSourcePartitions]] is a replica in kind to Spark's diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 513ab45c16af0..de3ace23fb8b4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -42,6 +42,7 @@ import org.apache.log4j.LogManager import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.BooleanType import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -427,6 +428,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testPrunedFiltered(recordType: HoodieRecordType) { + val (writeOpts, readOpts) = getWriterReaderOpts(recordType) // First Operation: diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala index 059f692514c88..7077b2d37a33e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala @@ -174,7 +174,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { fs.exists(path) } - protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { + protected def withSQLConf[T](pairs: (String, String)*)(f: => T): T = { val conf = spark.sessionState.conf val currentValues = pairs.unzip._1.map { k => if (conf.contains(k)) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala index 8c631f9bc2bcc..4f758a9e4f78b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.common.config.HoodieCommonConfig +import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{FileSourceScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan} @@ -37,9 +39,7 @@ class TestNestedSchemaPruningOptimization extends HoodieSparkSqlTestBase with Sp private def executePlan(plan: LogicalPlan): SparkPlan = spark.sessionState.executePlan(plan).executedPlan - // TODO(HUDI-5443) re-enable - /* - test("Test NestedSchemaPruning optimization (COW/MOR)") { + test("Test NestedSchemaPruning optimization successful") { withTempDir { tmp => // NOTE: This tests are only relevant for Spark >= 3.1 // TODO extract tests into a separate spark-version-specific module @@ -48,36 +48,15 @@ class TestNestedSchemaPruningOptimization extends HoodieSparkSqlTestBase with Sp val tableName = generateTableName val tablePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - |CREATE TABLE $tableName ( - | id int, - | item STRUCT, - | ts long - |) USING HUDI TBLPROPERTIES ( - | type = '$tableType', - | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.populate.meta.fields = 'false' - |) - |LOCATION '$tablePath' - """.stripMargin) - - spark.sql( - s""" - |INSERT INTO $tableName - |SELECT 1 AS id, named_struct('name', 'a1', 'price', 10) AS item, 123456 AS ts - """.stripMargin) + createTableWithNestedStructSchema(tableType, tableName, tablePath) val selectDF = spark.sql(s"SELECT id, item.name FROM $tableName") val expectedSchema = StructType(Seq( - StructField("id", IntegerType), - StructField("item" , StructType(Seq(StructField("name", StringType)))) + StructField("id", IntegerType, nullable = false), + StructField("item" , StructType(Seq(StructField("name", StringType, nullable = false))), nullable = false) )) - spark.sessionState.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false) - val expectedReadSchemaClause = "ReadSchema: struct>" val hint = s""" @@ -90,10 +69,12 @@ class TestNestedSchemaPruningOptimization extends HoodieSparkSqlTestBase with Sp |] |""".stripMargin - val executedPlan = executePlan(selectDF.logicalPlan) + // NOTE: We're disabling WCE to simplify resulting plan + spark.sessionState.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false) + // NOTE: Unfortunately, we can't use pattern-matching to extract required fields, due to a need to maintain // compatibility w/ Spark 2.4 - executedPlan match { + selectDF.queryExecution.executedPlan match { // COW case ProjectExec(_, fileScan: FileSourceScanExec) => val tableIdentifier = fileScan.tableIdentifier @@ -115,10 +96,102 @@ class TestNestedSchemaPruningOptimization extends HoodieSparkSqlTestBase with Sp //assertEquals(tableName, tableIdentifier.get.table) //assertEquals(expectedSchema, requiredSchema, hint) } + + // Execute the query to make sure it's working as expected (smoke test) + selectDF.count } } } } - */ + test("Test NestedSchemaPruning optimization unsuccessful") { + withTempDir { tmp => + // NOTE: This tests are only relevant for Spark >= 3.1 + // TODO extract tests into a separate spark-version-specific module + if (HoodieSparkUtils.gteqSpark3_1) { + // TODO add cow + Seq("mor").foreach { tableType => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + + // NOTE: Set of opts that will make [[NestedSchemaPruning]] ineffective + val (writeOpts, readOpts): (Map[String, String], Map[String, String]) = + tableType match { + case "cow" => + (Map(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key -> "true"), + Map(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key -> "true")) + + case "mor" => + (Map(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> "org.apache.hudi.common.model.DefaultHoodieRecordPayload"), + Map.empty) + } + + createTableWithNestedStructSchema(tableType, tableName, tablePath, writeOpts) + + val selectDF = withSQLConf(readOpts.toSeq: _*) { + spark.sql(s"SELECT id, item.name FROM $tableName") + } + + val expectedSchema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("item", + StructType(Seq( + StructField("name", StringType, nullable = false), + StructField("price", IntegerType, nullable = false))), nullable = false) + )) + + val expectedReadSchemaClause = "ReadSchema: struct>" + + // NOTE: We're disabling WCE to simplify resulting plan + spark.sessionState.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false) + + // NOTE: Unfortunately, we can't use pattern-matching to extract required fields, due to a need to maintain + // compatibility w/ Spark 2.4 + selectDF.queryExecution.executedPlan match { + // COW + case ProjectExec(_, fileScan: FileSourceScanExec) => + val tableIdentifier = fileScan.tableIdentifier + val requiredSchema = fileScan.requiredSchema + + assertEquals(tableName, tableIdentifier.get.table) + assertEquals(expectedSchema, requiredSchema) + + // MOR + case ProjectExec(_, dataScan: RowDataSourceScanExec) => + // NOTE: This is temporary solution to assert for Spark 2.4, until it's deprecated + val explainedPlan = explain(selectDF.queryExecution.logical) + assertTrue(explainedPlan.contains(expectedReadSchemaClause)) + + // TODO replace w/ after Spark 2.4 deprecation + //val tableIdentifier = dataScan.tableIdentifier + //val requiredSchema = dataScan.requiredSchema + // + //assertEquals(tableName, tableIdentifier.get.table) + //assertEquals(expectedSchema, requiredSchema, hint) + } + + // Execute the query to make sure it's working as expected (smoke test) + selectDF.count + } + } + } + } + + private def createTableWithNestedStructSchema(tableType: String, + tableName: String, + tablePath: String, + opts: Map[String, String] = Map.empty): Unit = { + spark.sql( + s""" + |CREATE TABLE $tableName USING HUDI TBLPROPERTIES ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.populate.meta.fields = 'false' + | ${if (opts.nonEmpty) "," + opts.map{ case (k, v) => s"'$k' = '$v'" }.mkString(",") else ""} + |) + |LOCATION '$tablePath' + |AS SELECT 1 AS id, named_struct('name', 'a1', 'price', 10) AS item, 123456 AS ts + """.stripMargin) + } }