diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala index 88ac16933186f..58789681c54cd 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.internal.SQLConf trait HoodieCatalystPlansUtils { @@ -77,6 +78,15 @@ trait HoodieCatalystPlansUtils { */ def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan, LogicalPlan, Expression)] + + /** + * Spark requires file formats to append the partition path fields to the end of the schema. + * For tables where the partition path fields are not at the end of the schema, we don't want + * to return the schema in the wrong order when they do a query like "select *". To fix this + * behavior, we apply a projection onto FileScan when the file format is NewHudiParquetFileFormat + */ + def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan + /** * Decomposes [[InsertIntoStatement]] into its arguments allowing to accommodate for API * changes in Spark 3.3 diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 782a49ac18979..041beba95df91 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -26,11 +26,10 @@ import org.apache.spark.sql._ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer} import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.logical.{Command, Join, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} import org.apache.spark.sql.catalyst.util.DateFormatter import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.execution.datasources._ @@ -38,6 +37,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.parser.HoodieExtendedParserInterface import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.{DataType, Metadata, StructType} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} import org.apache.spark.storage.StorageLevel import java.util.{Locale, TimeZone} @@ -165,7 +165,9 @@ trait SparkAdapter extends Serializable { /** * Create instance of [[ParquetFileFormat]] */ - def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] + def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] + + def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int): ColumnarBatch /** * Create instance of [[InterpretedPredicate]] diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 556b0feef1cdb..f4029445f6114 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -17,4 +17,4 @@ org.apache.hudi.DefaultSource -org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat \ No newline at end of file +org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 59c2a60a3adfa..6e14e262d2c83 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -87,6 +87,15 @@ object DataSourceReadOptions { s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) or skip merging altogether" + s"${REALTIME_SKIP_MERGE_OPT_VAL}") + val USE_NEW_HUDI_PARQUET_FILE_FORMAT: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.read.use.new.parquet.file.format") + .defaultValue("false") + .markAdvanced() + .sinceVersion("0.14.0") + .withDocumentation("Read using the new Hudi parquet file format. The new Hudi parquet file format is " + + "introduced as an experimental feature in 0.14.0. Currently, the new Hudi parquet file format only applies " + + "to bootstrap and MOR queries. Schema evolution is also not supported by the new file format.") + val READ_PATHS: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.paths") .noDefaultValue() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 3e5cf351ba136..5ecf250eaabb1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -246,6 +246,16 @@ object DefaultSource { } else if (isCdcQuery) { CDCRelation.getCDCRelation(sqlContext, metaClient, parameters) } else { + lazy val newHudiFileFormatUtils = if (parameters.getOrElse(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key, + USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean && (globPaths == null || globPaths.isEmpty) + && parameters.getOrElse(REALTIME_MERGE.key(), REALTIME_MERGE.defaultValue()) + .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)) { + val formatUtils = new NewHoodieParquetFileFormatUtils(sqlContext, metaClient, parameters, userSchema) + if (formatUtils.hasSchemaOnRead) Option.empty else Some(formatUtils) + } else { + Option.empty + } + (tableType, queryType, isBootstrappedTable) match { case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) | @@ -256,16 +266,28 @@ object DefaultSource { new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) => - new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema) + if (newHudiFileFormatUtils.isEmpty) { + new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema) + } else { + newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap = false) + } case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => new MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema) case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) => - new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters) + if (newHudiFileFormatUtils.isEmpty) { + new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters) + } else { + newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap = true) + } case (_, _, true) => - resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters) + if (newHudiFileFormatUtils.isEmpty) { + resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters) + } else { + newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = false, isBootstrap = true) + } case (_, _, _) => throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," + diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 2f9579d629ee7..fea7781f84d20 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -56,7 +56,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat -import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{LegacyHoodieParquetFileFormat, ParquetFileFormat} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.StructType @@ -241,8 +241,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, case HoodieFileFormat.PARQUET => // We're delegating to Spark to append partition values to every row only in cases // when these corresponding partition-values are not persisted w/in the data file itself - val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get - (parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID) + val parquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get + (parquetFileFormat, LegacyHoodieParquetFileFormat.FILE_FORMAT_ID) } /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala index 49db12b792cef..eb8ddfdf870c4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -52,7 +52,7 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport { options: Map[String, String], hadoopConf: Configuration, appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = { - val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get + val parquetFileFormat: ParquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat(appendPartitionValues).get val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues( sparkSession = sparkSession, dataSchema = dataSchema, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 9791d39e2807d..964ef970c91e7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -18,7 +18,7 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferencedColumns, getConfigProperties} +import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferencedColumns, convertFilterForTimestampKeyGenerator, getConfigProperties} import org.apache.hudi.HoodieSparkConfUtils.getConfigValue import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT} import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} @@ -100,6 +100,8 @@ case class HoodieFileIndex(spark: SparkSession, override def rootPaths: Seq[Path] = getQueryPaths.asScala + var shouldBroadcast: Boolean = false + /** * Returns the FileStatus for all the base files (excluding log files). This should be used only for * cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic @@ -142,26 +144,49 @@ case class HoodieFileIndex(spark: SparkSession, override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { val prunedPartitionsAndFilteredFileSlices = filterFileSlices(dataFilters, partitionFilters).map { case (partitionOpt, fileSlices) => - val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => { - val baseFileStatusOpt = getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null))) - val logFilesStatus = if (includeLogFiles) { - fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile, FileStatus](lf => lf.getFileStatus)) + if (shouldBroadcast) { + val baseFileStatusesAndLogFileOnly: Seq[FileStatus] = fileSlices.map(slice => { + if (slice.getBaseFile.isPresent) { + slice.getBaseFile.get().getFileStatus + } else if (slice.getLogFiles.findAny().isPresent) { + slice.getLogFiles.findAny().get().getFileStatus + } else { + null + } + }).filter(slice => slice != null) + val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent + || (f.getBaseFile.isPresent && f.getBaseFile.get().getBootstrapBaseFile.isPresent)). + foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> f) } + if (c.nonEmpty) { + PartitionDirectory(new PartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), spark.sparkContext.broadcast(c)), baseFileStatusesAndLogFileOnly) } else { - java.util.stream.Stream.empty() + PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), baseFileStatusesAndLogFileOnly) } - val files = logFilesStatus.collect(Collectors.toList[FileStatus]).asScala - baseFileStatusOpt.foreach(f => files.append(f)) - files - }) - PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), allCandidateFiles) + } else { + val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => { + val baseFileStatusOpt = getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null))) + val logFilesStatus = if (includeLogFiles) { + fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile, FileStatus](lf => lf.getFileStatus)) + } else { + java.util.stream.Stream.empty() + } + val files = logFilesStatus.collect(Collectors.toList[FileStatus]).asScala + baseFileStatusOpt.foreach(f => files.append(f)) + files + }) + PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), allCandidateFiles) + } } hasPushedDownPartitionPredicates = true if (shouldReadAsPartitionedTable()) { prunedPartitionsAndFilteredFileSlices - } else { + } else if (shouldBroadcast) { + assert(partitionSchema.isEmpty) + prunedPartitionsAndFilteredFileSlices + }else { Seq(PartitionDirectory(InternalRow.empty, prunedPartitionsAndFilteredFileSlices.flatMap(_.files))) } } @@ -244,7 +269,11 @@ case class HoodieFileIndex(spark: SparkSession, // Prune the partition path by the partition filters // NOTE: Non-partitioned tables are assumed to consist from a single partition // encompassing the whole table - val prunedPartitions = listMatchingPartitionPaths (partitionFilters) + val prunedPartitions = if (shouldBroadcast) { + listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)) + } else { + listMatchingPartitionPaths(partitionFilters) + } getInputFileSlices(prunedPartitions: _*).asScala.toSeq.map( { case (partition, fileSlices) => (Option.apply(partition), fileSlices.asScala) }) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 6daa84e1b4263..79dad997b4cd1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -37,7 +37,7 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper import org.apache.hudi.table.HoodieSparkTable import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat +import org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat import org.apache.spark.sql.sources.{BaseRelation, TableScan} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext} @@ -206,7 +206,7 @@ class IncrementalRelation(val sqlContext: SQLContext, sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath) sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) val formatClassName = metaClient.getTableConfig.getBaseFileFormat match { - case HoodieFileFormat.PARQUET => HoodieParquetFileFormat.FILE_FORMAT_ID + case HoodieFileFormat.PARQUET => LegacyHoodieParquetFileFormat.FILE_FORMAT_ID case HoodieFileFormat.ORC => "orc" } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala index 942ade81cc5a6..054fcc799d7af 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala @@ -30,7 +30,7 @@ import org.apache.hudi.LogFileIterator._ import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.engine.{EngineType, HoodieLocalEngineContext} import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath +import org.apache.hudi.common.fs.FSUtils.{buildInlineConf, getRelativePartitionPath} import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.model._ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner @@ -61,15 +61,26 @@ import scala.util.Try class LogFileIterator(logFiles: List[HoodieLogFile], partitionPath: Path, tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, + requiredStructTypeSchema: StructType, + requiredAvroSchema: Schema, tableState: HoodieTableState, config: Configuration) extends CachingIterator[InternalRow] with AvroDeserializerSupport { + def this(logFiles: List[HoodieLogFile], + partitionPath: Path, + tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + tableState: HoodieTableState, + config: Configuration) { + this(logFiles, partitionPath, tableSchema, requiredSchema.structTypeSchema, + new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState, config) + } def this(split: HoodieMergeOnReadFileSplit, tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, - tableState: HoodieTableState, config: Configuration) { + tableState: HoodieTableState, + config: Configuration) { this(split.logFiles, getPartitionPath(split), tableSchema, requiredSchema, tableState, config) } private val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) @@ -83,8 +94,8 @@ class LogFileIterator(logFiles: List[HoodieLogFile], } .getOrElse(new TypedProperties()) - protected override val avroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) - protected override val structTypeSchema: StructType = requiredSchema.structTypeSchema + protected override val avroSchema: Schema = requiredAvroSchema + protected override val structTypeSchema: StructType = requiredStructTypeSchema protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr) protected val logFileReaderStructType: StructType = tableSchema.structTypeSchema @@ -142,20 +153,22 @@ class LogFileIterator(logFiles: List[HoodieLogFile], * Base file as well as all of the Delta Log files simply returning concatenation of these streams, while not * performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially) */ -private class SkipMergeIterator(logFiles: List[HoodieLogFile], +class SkipMergeIterator(logFiles: List[HoodieLogFile], partitionPath: Path, baseFileIterator: Iterator[InternalRow], readerSchema: StructType, dataSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, + requiredStructTypeSchema: StructType, + requiredAvroSchema: Schema, tableState: HoodieTableState, config: Configuration) - extends LogFileIterator(logFiles, partitionPath, dataSchema, requiredSchema, tableState, config) { + extends LogFileIterator(logFiles, partitionPath, dataSchema, requiredStructTypeSchema, requiredAvroSchema, tableState, config) { def this(split: HoodieMergeOnReadFileSplit, baseFileReader: BaseFileReader, dataSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, tableState: HoodieTableState, config: Configuration) { this(split.logFiles, getPartitionPath(split), baseFileReader(split.dataFile.get), - baseFileReader.schema, dataSchema, requiredSchema, tableState, config) + baseFileReader.schema, dataSchema, requiredSchema.structTypeSchema, + new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState, config) } private val requiredSchemaProjection = generateUnsafeProjection(readerSchema, structTypeSchema) @@ -181,11 +194,23 @@ class RecordMergingFileIterator(logFiles: List[HoodieLogFile], baseFileIterator: Iterator[InternalRow], readerSchema: StructType, dataSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, + requiredStructTypeSchema: StructType, + requiredAvroSchema: Schema, tableState: HoodieTableState, config: Configuration) - extends LogFileIterator(logFiles, partitionPath, dataSchema, requiredSchema, tableState, config) { + extends LogFileIterator(logFiles, partitionPath, dataSchema, requiredStructTypeSchema, requiredAvroSchema, tableState, config) { + def this(logFiles: List[HoodieLogFile], + partitionPath: Path, + baseFileIterator: Iterator[InternalRow], + readerSchema: StructType, + dataSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + tableState: HoodieTableState, + config: Configuration) { + this(logFiles, partitionPath, baseFileIterator, readerSchema, dataSchema, requiredSchema.structTypeSchema, + new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState, config) + } def this(split: HoodieMergeOnReadFileSplit, baseFileReader: BaseFileReader, dataSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, tableState: HoodieTableState, config: Configuration) { this(split.logFiles, getPartitionPath(split), baseFileReader(split.dataFile.get), diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala new file mode 100644 index 0000000000000..5dd85c973b682 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.JobConf +import org.apache.hudi.HoodieBaseRelation._ +import org.apache.hudi.HoodieConversionUtils.toScalaOption +import org.apache.hudi.common.config.ConfigProperty +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.model.HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX +import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.common.util.{ConfigUtils, StringUtils} +import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter +import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} +import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat +import org.apache.spark.sql.execution.datasources.{FileStatusCache, HadoopFsRelation} +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{SQLContext, SparkSession} + +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} + +class NewHoodieParquetFileFormatUtils(val sqlContext: SQLContext, + val metaClient: HoodieTableMetaClient, + val optParamsInput: Map[String, String], + private val schemaSpec: Option[StructType]) extends SparkAdapterSupport { + protected val sparkSession: SparkSession = sqlContext.sparkSession + + protected val optParams: Map[String, String] = optParamsInput.filter(kv => !kv._1.equals(DATA_QUERIES_ONLY.key())) + protected def tableName: String = metaClient.getTableConfig.getTableName + + protected lazy val resolver: Resolver = sparkSession.sessionState.analyzer.resolver + + private lazy val metaFieldNames = HoodieRecord.HOODIE_META_COLUMNS.asScala.toSet + + protected lazy val conf: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + protected lazy val jobConf = new JobConf(conf) + + protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig + + protected lazy val basePath: Path = metaClient.getBasePathV2 + + protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = { + val schemaResolver = new TableSchemaResolver(metaClient) + val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams, sparkSession)) { + None + } else { + Try { + specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata) + .getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata) + } match { + case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt) + case Failure(e) => + None + } + } + + val (name, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName) + val avroSchema = internalSchemaOpt.map { is => + AvroInternalSchemaConverter.convert(is, namespace + "." + name) + } orElse { + specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema) + } orElse { + schemaSpec.map(s => convertToAvroSchema(s, tableName)) + } getOrElse { + Try(schemaResolver.getTableAvroSchema) match { + case Success(schema) => schema + case Failure(e) => + throw new HoodieSchemaException("Failed to fetch schema from the table") + } + } + + (avroSchema, internalSchemaOpt) + } + + protected lazy val tableStructSchema: StructType = { + val converted = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) + val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField + + // NOTE: Here we annotate meta-fields with corresponding metadata such that Spark (>= 3.2) + // is able to recognize such fields as meta-fields + StructType(converted.map { field => + if (metaFieldNames.exists(metaFieldName => resolver(metaFieldName, field.name))) { + field.copy(metadata = metaFieldMetadata) + } else { + field + } + }) + } + + protected lazy val preCombineFieldOpt: Option[String] = + Option(tableConfig.getPreCombineField) + .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match { + // NOTE: This is required to compensate for cases when empty string is used to stub + // property value to avoid it being set with the default value + // TODO(HUDI-3456) cleanup + case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f) + case _ => None + } + + protected def timeline: HoodieTimeline = + // NOTE: We're including compaction here since it's not considering a "commit" operation + metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants + + protected lazy val recordKeyField: String = + if (tableConfig.populateMetaFields()) { + HoodieRecord.RECORD_KEY_METADATA_FIELD + } else { + val keyFields = tableConfig.getRecordKeyFields.get() + checkState(keyFields.length == 1) + keyFields.head + } + + private def queryTimestamp: Option[String] = + specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp)) + + protected lazy val specifiedQueryTimestamp: Option[String] = + optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) + .map(HoodieSqlCommonUtils.formatQueryInstant) + + private def getConfigValue(config: ConfigProperty[String], + defaultValueOption: Option[String] = Option.empty): String = { + optParams.getOrElse(config.key(), + sqlContext.getConf(config.key(), defaultValueOption.getOrElse(config.defaultValue()))) + } + + protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key, + DataSourceReadOptions.REALTIME_MERGE.defaultValue) + + protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = { + // Controls whether partition columns (which are the source for the partition path values) should + // be omitted from persistence in the data files. On the read path it affects whether partition values (values + // of partition columns) will be read from the data file or extracted from partition path + val shouldOmitPartitionColumns = metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty + val shouldExtractPartitionValueFromPath = + optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, + DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean + val shouldUseBootstrapFastRead = optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean + + shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath || shouldUseBootstrapFastRead + } + + protected lazy val mandatoryFieldsForMerging: Seq[String] = + Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) + + protected lazy val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) + + def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined + + def getHadoopFsRelation(isMOR: Boolean, isBootstrap: Boolean): BaseRelation = { + + val fileIndex = HoodieFileIndex(sparkSession, metaClient, Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sparkSession), isMOR) + val recordMergerImpls = ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList + val recordMergerStrategy = getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY, + Option(metaClient.getTableConfig.getRecordMergerStrategy)) + + val tableState = // Subset of the state of table's configuration as of at the time of the query + HoodieTableState( + tablePath = basePath.toString, + latestCommitTimestamp = queryTimestamp, + recordKeyField = recordKeyField, + preCombineFieldOpt = preCombineFieldOpt, + usesVirtualKeys = !tableConfig.populateMetaFields(), + recordPayloadClassName = tableConfig.getPayloadClass, + metadataConfig = fileIndex.metadataConfig, + recordMergerImpls = recordMergerImpls, + recordMergerStrategy = recordMergerStrategy + ) + + val mandatoryFields = if (isMOR) { + mandatoryFieldsForMerging + } else { + Seq.empty + } + fileIndex.shouldBroadcast = true + HadoopFsRelation( + location = fileIndex, + partitionSchema = fileIndex.partitionSchema, + dataSchema = fileIndex.dataSchema, + bucketSpec = None, + fileFormat = new NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState), + sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)), + metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, isMOR, isBootstrap), + optParams)(sparkSession) + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala new file mode 100644 index 0000000000000..c9468e2d601f9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.model.FileSlice +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{DataType, Decimal} +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} + +class PartitionFileSliceMapping(internalRow: InternalRow, + broadcast: Broadcast[Map[String, FileSlice]]) extends InternalRow { + + def getSlice(fileId: String): Option[FileSlice] = { + broadcast.value.get(fileId) + } + + def getInternalRow: InternalRow = internalRow + + override def numFields: Int = internalRow.numFields + + override def setNullAt(i: Int): Unit = internalRow.setNullAt(i) + + override def update(i: Int, value: Any): Unit = internalRow.update(i, value) + + override def copy(): InternalRow = new PartitionFileSliceMapping(internalRow.copy(), broadcast) + + override def isNullAt(ordinal: Int): Boolean = internalRow.isNullAt(ordinal) + + override def getBoolean(ordinal: Int): Boolean = internalRow.getBoolean(ordinal) + + override def getByte(ordinal: Int): Byte = internalRow.getByte(ordinal) + + override def getShort(ordinal: Int): Short = internalRow.getShort(ordinal) + + override def getInt(ordinal: Int): Int = internalRow.getInt(ordinal) + + override def getLong(ordinal: Int): Long = internalRow.getLong(ordinal) + + override def getFloat(ordinal: Int): Float = internalRow.getFloat(ordinal) + + override def getDouble(ordinal: Int): Double = internalRow.getDouble(ordinal) + + override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = internalRow.getDecimal(ordinal, precision, scale) + + override def getUTF8String(ordinal: Int): UTF8String = internalRow.getUTF8String(ordinal) + + override def getBinary(ordinal: Int): Array[Byte] = internalRow.getBinary(ordinal) + + override def getInterval(ordinal: Int): CalendarInterval = internalRow.getInterval(ordinal) + + override def getStruct(ordinal: Int, numFields: Int): InternalRow = internalRow.getStruct(ordinal, numFields) + + override def getArray(ordinal: Int): ArrayData = internalRow.getArray(ordinal) + + override def getMap(ordinal: Int): MapData = internalRow.getMap(ordinal) + + override def get(ordinal: Int, dataType: DataType): AnyRef = internalRow.get(ordinal, dataType) +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index b3d9e5659e8ce..d1b6df6619da2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -47,7 +47,7 @@ import java.util.Collections import javax.annotation.concurrent.NotThreadSafe import scala.collection.JavaConverters._ import scala.language.implicitConversions -import scala.util.{Failure, Success, Try} +import scala.util.{Success, Try} /** * Implementation of the [[BaseHoodieTableFileIndex]] for Spark @@ -96,7 +96,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) }) - protected lazy val shouldFastBootstrap: Boolean = configProperties.getBoolean(DATA_QUERIES_ONLY.key, false) + protected lazy val shouldFastBootstrap = configProperties.getBoolean(DATA_QUERIES_ONLY.key, false) private lazy val sparkParsePartitionUtil = sparkAdapter.getSparkParsePartitionUtil diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala similarity index 85% rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala index 7f494af37af97..046640c11c1ba 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala @@ -23,12 +23,15 @@ import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, SparkAdapterSup import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat.FILE_FORMAT_ID +import org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat.FILE_FORMAT_ID import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{AtomicType, StructType} - -class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport { +/** + * This legacy parquet file format implementation to support Hudi will be replaced by + * [[NewHoodieParquetFileFormat]] in the future. + */ +class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport { override def shortName(): String = FILE_FORMAT_ID @@ -55,11 +58,11 @@ class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean sparkAdapter - .createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get + .createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get .buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) } } -object HoodieParquetFileFormat { +object LegacyHoodieParquetFileFormat { val FILE_FORMAT_ID = "hoodie-parquet" } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala new file mode 100644 index 0000000000000..0c1c3c8e5ee51 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hudi.DataSourceReadOptions.{REALTIME_PAYLOAD_COMBINE_OPT_VAL, REALTIME_SKIP_MERGE_OPT_VAL} +import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.{BaseFile, FileSlice, HoodieLogFile, HoodieRecord} +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils, HoodieTableSchema, HoodieTableState, LogFileIterator, MergeOnReadSnapshotRelation, PartitionFileSliceMapping, RecordMergingFileIterator, SkipMergeIterator, SparkAdapterSupport} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} +import org.apache.spark.util.SerializableConfiguration + +import scala.collection.mutable +import scala.jdk.CollectionConverters.asScalaIteratorConverter + +/** + * This class does bootstrap and MOR merging so that we can use hadoopfs relation. + */ +class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState], + tableSchema: Broadcast[HoodieTableSchema], + tableName: String, + mergeType: String, + mandatoryFields: Seq[String], + isMOR: Boolean, + isBootstrap: Boolean) extends ParquetFileFormat with SparkAdapterSupport { + + override def isSplitable(sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = { + false + } + + //Used so that the planner only projects once and does not stack overflow + var isProjected = false + + /** + * Support batch needs to remain consistent, even if one side of a bootstrap merge can support + * while the other side can't + */ + private var supportBatchCalled = false + private var supportBatchResult = false + override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { + if (!supportBatchCalled) { + supportBatchCalled = true + supportBatchResult = !isMOR && super.supportBatch(sparkSession, schema) + } + supportBatchResult + } + + override def buildReaderWithPartitionValues(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + + val outputSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) + + val requiredSchemaWithMandatory = if (!isMOR || MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) { + //add mandatory fields to required schema + val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]() + for (field <- mandatoryFields) { + if (requiredSchema.getFieldIndex(field).isEmpty) { + val fieldToAdd = dataSchema.fields(dataSchema.getFieldIndex(field).get) + added.append(fieldToAdd) + } + } + val addedFields = StructType(added.toArray) + StructType(requiredSchema.toArray ++ addedFields.fields) + } else { + dataSchema + } + + val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f => HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name)) + val requiredMeta = StructType(requiredSchemaSplits._1) + val requiredWithoutMeta = StructType(requiredSchemaSplits._2) + val needMetaCols = requiredMeta.nonEmpty + val needDataCols = requiredWithoutMeta.nonEmpty + // note: this is only the output of the bootstrap merge if isMOR. If it is only bootstrap then the + // output will just be outputSchema + val bootstrapReaderOutput = StructType(requiredMeta.fields ++ requiredWithoutMeta.fields) + + val skeletonReaderAppend = needMetaCols && isBootstrap && !(needDataCols || isMOR) && partitionSchema.nonEmpty + val bootstrapBaseAppend = needDataCols && isBootstrap && !isMOR && partitionSchema.nonEmpty + + val (baseFileReader, preMergeBaseFileReader, skeletonReader, bootstrapBaseReader) = buildFileReaders(sparkSession, + dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf, requiredSchemaWithMandatory, + requiredWithoutMeta, requiredMeta) + + val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + (file: PartitionedFile) => { + file.partitionValues match { + case broadcast: PartitionFileSliceMapping => + val filePath = sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file) + if (FSUtils.isLogFile(filePath)) { + //no base file + val fileSlice = broadcast.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get + val logFiles = getLogFilesFromSlice(fileSlice) + val outputAvroSchema = HoodieBaseRelation.convertToAvroSchema(outputSchema, tableName) + new LogFileIterator(logFiles, filePath.getParent, tableSchema.value, outputSchema, outputAvroSchema, + tableState.value, broadcastedHadoopConf.value.value) + } else { + //We do not broadcast the slice if it has no log files or bootstrap base + broadcast.getSlice(FSUtils.getFileId(filePath.getName)) match { + case Some(fileSlice) => + val hoodieBaseFile = fileSlice.getBaseFile.get() + val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile + val partitionValues = broadcast.getInternalRow + val logFiles = getLogFilesFromSlice(fileSlice) + if (requiredSchemaWithMandatory.isEmpty) { + val baseFile = createPartitionedFile(partitionValues, hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen) + baseFileReader(baseFile) + } else if (bootstrapFileOpt.isPresent) { + val bootstrapIterator = buildBootstrapIterator(skeletonReader, bootstrapBaseReader, + skeletonReaderAppend, bootstrapBaseAppend, bootstrapFileOpt.get(), hoodieBaseFile, partitionValues, + needMetaCols, needDataCols) + (isMOR, logFiles.nonEmpty) match { + case (true, true) => buildMergeOnReadIterator(bootstrapIterator, logFiles, filePath.getParent, + bootstrapReaderOutput, requiredSchemaWithMandatory, outputSchema, partitionSchema, partitionValues, + broadcastedHadoopConf.value.value) + case (true, false) => appendPartitionAndProject(bootstrapIterator, bootstrapReaderOutput, + partitionSchema, outputSchema, partitionValues) + case (false, false) => bootstrapIterator + case (false, true) => throw new IllegalStateException("should not be log files if not mor table") + } + } else { + if (logFiles.nonEmpty) { + val baseFile = createPartitionedFile(InternalRow.empty, hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen) + buildMergeOnReadIterator(preMergeBaseFileReader(baseFile), logFiles, filePath.getParent, requiredSchemaWithMandatory, + requiredSchemaWithMandatory, outputSchema, partitionSchema, partitionValues, broadcastedHadoopConf.value.value) + } else { + throw new IllegalStateException("should not be here since file slice should not have been broadcasted since it has no log or data files") + //baseFileReader(baseFile) + } + } + case _ => baseFileReader(file) + } + } + case _ => baseFileReader(file) + } + } + } + + /** + * Build file readers to read individual physical files + */ + protected def buildFileReaders(sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, + requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], + hadoopConf: Configuration, requiredSchemaWithMandatory: StructType, + requiredWithoutMeta: StructType, requiredMeta: StructType): + (PartitionedFile => Iterator[InternalRow], + PartitionedFile => Iterator[InternalRow], + PartitionedFile => Iterator[InternalRow], + PartitionedFile => Iterator[InternalRow]) = { + + //file reader when you just read a hudi parquet file and don't do any merging + + val baseFileReader = super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, + filters, options, new Configuration(hadoopConf)) + + //file reader for reading a hudi base file that needs to be merged with log files + val preMergeBaseFileReader = if (isMOR) { + super.buildReaderWithPartitionValues(sparkSession, dataSchema, StructType(Seq.empty), + requiredSchemaWithMandatory, Seq.empty, options, new Configuration(hadoopConf)) + } else { + _: PartitionedFile => Iterator.empty + } + + //Rules for appending partitions and filtering in the bootstrap readers: + // 1. if it is mor, we don't want to filter data or append partitions + // 2. if we need to merge the bootstrap base and skeleton files then we cannot filter + // 3. if we need to merge the bootstrap base and skeleton files then we should never append partitions to the + // skeleton reader + + val needMetaCols = requiredMeta.nonEmpty + val needDataCols = requiredWithoutMeta.nonEmpty + + //file reader for bootstrap skeleton files + val skeletonReader = if (needMetaCols && isBootstrap) { + if (needDataCols || isMOR) { + // no filter and no append + super.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, StructType(Seq.empty), + requiredMeta, Seq.empty, options, new Configuration(hadoopConf)) + } else { + // filter and append + super.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, partitionSchema, + requiredMeta, filters, options, new Configuration(hadoopConf)) + } + } else { + _: PartitionedFile => Iterator.empty + } + + //file reader for bootstrap base files + val bootstrapBaseReader = if (needDataCols && isBootstrap) { + val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf => isMetaField(sf.name))) + if (isMOR) { + // no filter and no append + super.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta, + Seq.empty, options, new Configuration(hadoopConf)) + } else if (needMetaCols) { + // no filter but append + super.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta, + Seq.empty, options, new Configuration(hadoopConf)) + } else { + // filter and append + super.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta, + filters, options, new Configuration(hadoopConf)) + } + } else { + _: PartitionedFile => Iterator.empty + } + + (baseFileReader, preMergeBaseFileReader, skeletonReader, bootstrapBaseReader) + } + + /** + * Create iterator for a file slice that has bootstrap base and skeleton file + */ + protected def buildBootstrapIterator(skeletonReader: PartitionedFile => Iterator[InternalRow], + bootstrapBaseReader: PartitionedFile => Iterator[InternalRow], + skeletonReaderAppend: Boolean, bootstrapBaseAppend: Boolean, + bootstrapBaseFile: BaseFile, hoodieBaseFile: BaseFile, + partitionValues: InternalRow, needMetaCols: Boolean, + needDataCols: Boolean): Iterator[InternalRow] = { + lazy val skeletonFile = if (skeletonReaderAppend) { + createPartitionedFile(partitionValues, hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen) + } else { + createPartitionedFile(InternalRow.empty, hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen) + } + + lazy val dataFile = if (bootstrapBaseAppend) { + createPartitionedFile(partitionValues, bootstrapBaseFile.getHadoopPath, 0, bootstrapBaseFile.getFileLen) + } else { + createPartitionedFile(InternalRow.empty, bootstrapBaseFile.getHadoopPath, 0, bootstrapBaseFile.getFileLen) + } + + lazy val skeletonIterator = skeletonReader(skeletonFile) + lazy val dataFileIterator = bootstrapBaseReader(dataFile) + + (needMetaCols, needDataCols) match { + case (true, true) => doBootstrapMerge(skeletonIterator, dataFileIterator) + case (true, false) => skeletonIterator + case (false, true) => dataFileIterator + case (false, false) => throw new IllegalStateException("should not be here if only partition cols are required") + } + } + + /** + * Merge skeleton and data file iterators + */ + protected def doBootstrapMerge(skeletonFileIterator: Iterator[Any], dataFileIterator: Iterator[Any]): Iterator[InternalRow] = { + new Iterator[Any] { + val combinedRow = new JoinedRow() + + override def hasNext: Boolean = { + checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext, + "Bootstrap data-file iterator and skeleton-file iterator have to be in-sync!") + dataFileIterator.hasNext && skeletonFileIterator.hasNext + } + + override def next(): Any = { + (skeletonFileIterator.next(), dataFileIterator.next()) match { + case (s: ColumnarBatch, d: ColumnarBatch) => + val numCols = s.numCols() + d.numCols() + val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols) + for (i <- 0 until numCols) { + if (i < s.numCols()) { + vecs(i) = s.column(i) + } else { + vecs(i) = d.column(i - s.numCols()) + } + } + assert(s.numRows() == d.numRows()) + sparkAdapter.makeColumnarBatch(vecs, s.numRows()) + case (_: ColumnarBatch, _: InternalRow) => throw new IllegalStateException("InternalRow ColumnVector mismatch") + case (_: InternalRow, _: ColumnarBatch) => throw new IllegalStateException("InternalRow ColumnVector mismatch") + case (s: InternalRow, d: InternalRow) => combinedRow(s, d) + } + } + }.asInstanceOf[Iterator[InternalRow]] + } + + /** + * Create iterator for a file slice that has log files + */ + protected def buildMergeOnReadIterator(iter: Iterator[InternalRow], logFiles: List[HoodieLogFile], + partitionPath: Path, inputSchema: StructType, requiredSchemaWithMandatory: StructType, + outputSchema: StructType, partitionSchema: StructType, partitionValues: InternalRow, + hadoopConf: Configuration): Iterator[InternalRow] = { + + val requiredAvroSchema = HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, tableName) + val morIterator = mergeType match { + case REALTIME_SKIP_MERGE_OPT_VAL => throw new UnsupportedOperationException("Skip merge is not currently " + + "implemented for the New Hudi Parquet File format") + //new SkipMergeIterator(logFiles, partitionPath, iter, inputSchema, tableSchema.value, + // requiredSchemaWithMandatory, requiredAvroSchema, tableState.value, hadoopConf) + case REALTIME_PAYLOAD_COMBINE_OPT_VAL => + new RecordMergingFileIterator(logFiles, partitionPath, iter, inputSchema, tableSchema.value, + requiredSchemaWithMandatory, requiredAvroSchema, tableState.value, hadoopConf) + } + appendPartitionAndProject(morIterator, requiredSchemaWithMandatory, partitionSchema, + outputSchema, partitionValues) + } + + /** + * Append partition values to rows and project to output schema + */ + protected def appendPartitionAndProject(iter: Iterator[InternalRow], + inputSchema: StructType, + partitionSchema: StructType, + to: StructType, + partitionValues: InternalRow): Iterator[InternalRow] = { + if (partitionSchema.isEmpty) { + projectSchema(iter, inputSchema, to) + } else { + val unsafeProjection = generateUnsafeProjection(StructType(inputSchema.fields ++ partitionSchema.fields), to) + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, partitionValues))) + } + } + + protected def projectSchema(iter: Iterator[InternalRow], + from: StructType, + to: StructType): Iterator[InternalRow] = { + val unsafeProjection = generateUnsafeProjection(from, to) + iter.map(d => unsafeProjection(d)) + } + + protected def getLogFilesFromSlice(fileSlice: FileSlice): List[HoodieLogFile] = { + fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 1f25493e5e954..3c2d41aa58287 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -265,6 +265,9 @@ object HoodieAnalysis extends SparkAdapterSupport { case ut @ UpdateTable(relation @ ResolvesToHudiTable(_), _, _) => ut.copy(table = relation) + + case logicalPlan: LogicalPlan if logicalPlan.resolved => + sparkAdapter.getCatalystPlanUtils.applyNewHoodieParquetFileFormatProjection(logicalPlan) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java index bbce1c61f0f26..f57be60461a1f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java @@ -18,86 +18,27 @@ package org.apache.hudi.functional; -import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector; -import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector; -import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; -import org.apache.hudi.client.bootstrap.translator.DecodedBootstrapPartitionPathTranslator; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.config.HoodieBootstrapConfig; -import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.keygen.ComplexKeyGenerator; -import org.apache.hudi.keygen.NonpartitionedKeyGenerator; -import org.apache.hudi.keygen.SimpleKeyGenerator; -import org.apache.hudi.testutils.HoodieSparkClientTestBase; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.functions; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; -import static org.apache.hudi.common.testutils.RawTripTestPayload.recordToString; -import static org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY; -import static org.junit.jupiter.api.Assertions.assertEquals; /** * Tests different layouts for bootstrap base path */ @Tag("functional") -public class TestBootstrapRead extends HoodieSparkClientTestBase { - - @TempDir - public java.nio.file.Path tmpFolder; - protected String bootstrapBasePath = null; - protected String bootstrapTargetPath = null; - protected String hudiBasePath = null; - - protected static int nInserts = 100; - protected static int nUpdates = 20; - protected static String[] dashPartitionPaths = {"2016-03-14","2016-03-15", "2015-03-16", "2015-03-17"}; - protected static String[] slashPartitionPaths = {"2016/03/15", "2015/03/16", "2015/03/17"}; - protected String bootstrapType; - protected Boolean dashPartitions; - protected HoodieTableType tableType; - protected Integer nPartitions; - - protected String[] partitionCols; - protected static String[] dropColumns = {"_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_file_name", "city_to_state"}; - - @BeforeEach - public void setUp() throws Exception { - bootstrapBasePath = tmpFolder.toAbsolutePath() + "/bootstrapBasePath"; - hudiBasePath = tmpFolder.toAbsolutePath() + "/hudiBasePath"; - bootstrapTargetPath = tmpFolder.toAbsolutePath() + "/bootstrapTargetPath"; - initSparkContexts(); - } - - @AfterEach - public void tearDown() throws IOException { - cleanupSparkContexts(); - cleanupClients(); - cleanupTestDataGenerator(); - } - +public class TestBootstrapRead extends TestBootstrapReadBase { private static Stream testArgs() { Stream.Builder b = Stream.builder(); String[] bootstrapType = {"full", "metadata", "mixed"}; @@ -149,193 +90,4 @@ public void runTests(String bootstrapType, Boolean dashPartitions, HoodieTableTy compareTables(); verifyMetaColOnlyRead(2); } - - protected Map basicOptions() { - Map options = new HashMap<>(); - options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType.name()); - options.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), "true"); - options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); - if (nPartitions == 0) { - options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName()); - } else { - options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), String.join(",", partitionCols)); - if (nPartitions == 1) { - options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()); - } else { - options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), ComplexKeyGenerator.class.getName()); - } - } - options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp"); - if (tableType.equals(MERGE_ON_READ)) { - options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); - } - options.put(HoodieWriteConfig.TBL_NAME.key(), "test"); - return options; - } - - protected Map setBootstrapOptions() { - Map options = basicOptions(); - options.put(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL()); - options.put(HoodieBootstrapConfig.BASE_PATH.key(), bootstrapBasePath); - if (!dashPartitions) { - options.put(HoodieBootstrapConfig.PARTITION_PATH_TRANSLATOR_CLASS_NAME.key(), DecodedBootstrapPartitionPathTranslator.class.getName()); - } - switch (bootstrapType) { - case "metadata": - options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), MetadataOnlyBootstrapModeSelector.class.getName()); - break; - case "full": - options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), FullRecordBootstrapModeSelector.class.getName()); - break; - case "mixed": - options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), BootstrapRegexModeSelector.class.getName()); - String regexPattern; - if (dashPartitions) { - regexPattern = "partition_path=2015-03-1[5-7]"; - } else { - regexPattern = "partition_path=2015%2F03%2F1[5-7]"; - } - if (nPartitions > 1) { - regexPattern = regexPattern + "\\/.*"; - } - options.put(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_PATTERN.key(), regexPattern); - break; - default: - throw new RuntimeException(); - } - return options; - } - - protected void doUpdate(Map options, String instantTime) { - Dataset updates = generateTestUpdates(instantTime, nUpdates); - doUpsert(options, updates); - } - - protected void doInsert(Map options, String instantTime) { - Dataset inserts = generateTestInserts(instantTime, nUpdates); - doUpsert(options, inserts); - } - - protected void doUpsert(Map options, Dataset df) { - String nCompactCommits = "3"; - df.write().format("hudi") - .options(options) - .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), nCompactCommits) - .mode(SaveMode.Append) - .save(hudiBasePath); - if (bootstrapType.equals("mixed")) { - // mixed tables have a commit for each of the metadata and full bootstrap modes - // so to align with the regular hudi table, we need to compact after 4 commits instead of 3 - nCompactCommits = "4"; - } - df.write().format("hudi") - .options(options) - .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), nCompactCommits) - .mode(SaveMode.Append) - .save(bootstrapTargetPath); - } - - protected void compareTables() { - Dataset hudiDf = sparkSession.read().format("hudi").load(hudiBasePath); - Dataset bootstrapDf = sparkSession.read().format("hudi").load(bootstrapTargetPath); - Dataset fastBootstrapDf = sparkSession.read().format("hudi").option(DATA_QUERIES_ONLY.key(), "true").load(bootstrapTargetPath); - if (nPartitions == 0) { - compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns)); - if (tableType.equals(COPY_ON_WRITE)) { - compareDf(fastBootstrapDf.drop("city_to_state"), bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path")); - } - return; - } - compareDf(hudiDf.drop(dropColumns).drop(partitionCols), bootstrapDf.drop(dropColumns).drop(partitionCols)); - compareDf(hudiDf.select("_row_key",partitionCols), bootstrapDf.select("_row_key",partitionCols)); - if (tableType.equals(COPY_ON_WRITE)) { - compareDf(fastBootstrapDf.drop("city_to_state").drop(partitionCols), bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path").drop(partitionCols)); - compareDf(fastBootstrapDf.select("_row_key",partitionCols), bootstrapDf.select("_row_key",partitionCols)); - } - } - - protected void verifyMetaColOnlyRead(Integer iteration) { - Dataset hudiDf = sparkSession.read().format("hudi").load(hudiBasePath).select("_hoodie_commit_time", "_hoodie_record_key"); - Dataset bootstrapDf = sparkSession.read().format("hudi").load(bootstrapTargetPath).select("_hoodie_commit_time", "_hoodie_record_key"); - hudiDf.show(100,false); - bootstrapDf.show(100,false); - if (iteration > 0) { - assertEquals(sparkSession.sql("select * from hudi_iteration_" + (iteration - 1)).intersect(hudiDf).count(), - sparkSession.sql("select * from bootstrap_iteration_" + (iteration - 1)).intersect(bootstrapDf).count()); - } - hudiDf.createOrReplaceTempView("hudi_iteration_" + iteration); - bootstrapDf.createOrReplaceTempView("bootstrap_iteration_" + iteration); - } - - protected void compareDf(Dataset df1, Dataset df2) { - assertEquals(0, df1.except(df2).count()); - assertEquals(0, df2.except(df1).count()); - } - - protected void setupDirs() { - dataGen = new HoodieTestDataGenerator(dashPartitions ? dashPartitionPaths : slashPartitionPaths); - Dataset inserts = generateTestInserts("000", nInserts); - if (dashPartitions) { - //test adding a partition to the table - inserts = inserts.filter("partition_path != '2016-03-14'"); - } - if (nPartitions > 0) { - partitionCols = new String[nPartitions]; - partitionCols[0] = "partition_path"; - for (int i = 1; i < partitionCols.length; i++) { - partitionCols[i] = "partpath" + (i + 1); - } - inserts.write().partitionBy(partitionCols).save(bootstrapBasePath); - } else { - inserts.write().save(bootstrapBasePath); - } - - inserts.write().format("hudi") - .options(basicOptions()) - .mode(SaveMode.Overwrite) - .save(hudiBasePath); - } - - protected Dataset makeInsertDf(String instantTime, Integer n) { - List records = dataGen.generateInserts(instantTime, n).stream() - .map(r -> recordToString(r).get()).collect(Collectors.toList()); - JavaRDD rdd = jsc.parallelize(records); - return sparkSession.read().json(rdd); - } - - protected Dataset generateTestInserts(String instantTime, Integer n) { - return addPartitionColumns(makeInsertDf(instantTime, n), nPartitions); - } - - protected Dataset makeUpdateDf(String instantTime, Integer n) { - try { - List records = dataGen.generateUpdates(instantTime, n).stream() - .map(r -> recordToString(r).get()).collect(Collectors.toList()); - JavaRDD rdd = jsc.parallelize(records); - return sparkSession.read().json(rdd); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - protected Dataset generateTestUpdates(String instantTime, Integer n) { - return addPartitionColumns(makeUpdateDf(instantTime, n), nPartitions); - } - - private static Dataset addPartitionColumns(Dataset df, Integer nPartitions) { - if (nPartitions < 2) { - return df; - } - for (int i = 2; i <= nPartitions; i++) { - df = applyPartition(df, i); - } - return df; - } - - private static Dataset applyPartition(Dataset df, Integer n) { - return df.withColumn("partpath" + n, - functions.md5(functions.concat_ws("," + n + ",", - df.col("partition_path"), - functions.hash(df.col("_row_key")).mod(n)))); - } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java new file mode 100644 index 0000000000000..d3246f7c4da83 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional; + +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector; +import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector; +import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; +import org.apache.hudi.client.bootstrap.translator.DecodedBootstrapPartitionPathTranslator; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.config.HoodieBootstrapConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.ComplexKeyGenerator; +import org.apache.hudi.keygen.NonpartitionedKeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.testutils.HoodieSparkClientTestBase; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.functions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT; +import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; +import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; +import static org.apache.hudi.common.testutils.RawTripTestPayload.recordToString; +import static org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("functional") +public abstract class TestBootstrapReadBase extends HoodieSparkClientTestBase { + + @TempDir + public java.nio.file.Path tmpFolder; + protected String bootstrapBasePath = null; + protected String bootstrapTargetPath = null; + protected String hudiBasePath = null; + + protected static int nInserts = 100; + protected static int nUpdates = 20; + protected static String[] dashPartitionPaths = {"2016-03-14","2016-03-15", "2015-03-16", "2015-03-17"}; + protected static String[] slashPartitionPaths = {"2016/03/15", "2015/03/16", "2015/03/17"}; + protected String bootstrapType; + protected Boolean dashPartitions; + protected HoodieTableType tableType; + protected Integer nPartitions; + + protected String[] partitionCols; + protected static String[] dropColumns = {"_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_file_name", "city_to_state"}; + + @BeforeEach + public void setUp() throws Exception { + bootstrapBasePath = tmpFolder.toAbsolutePath() + "/bootstrapBasePath"; + hudiBasePath = tmpFolder.toAbsolutePath() + "/hudiBasePath"; + bootstrapTargetPath = tmpFolder.toAbsolutePath() + "/bootstrapTargetPath"; + initSparkContexts(); + } + + @AfterEach + public void tearDown() throws IOException { + cleanupSparkContexts(); + cleanupClients(); + cleanupTestDataGenerator(); + } + + protected Map basicOptions() { + Map options = new HashMap<>(); + options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType.name()); + options.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), "true"); + options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + if (nPartitions == 0) { + options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName()); + } else { + options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), String.join(",", partitionCols)); + if (nPartitions == 1) { + options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()); + } else { + options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), ComplexKeyGenerator.class.getName()); + } + } + options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp"); + if (tableType.equals(MERGE_ON_READ)) { + options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); + } + options.put(HoodieWriteConfig.TBL_NAME.key(), "test"); + return options; + } + + protected Map setBootstrapOptions() { + Map options = basicOptions(); + options.put(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL()); + options.put(HoodieBootstrapConfig.BASE_PATH.key(), bootstrapBasePath); + if (!dashPartitions) { + options.put(HoodieBootstrapConfig.PARTITION_PATH_TRANSLATOR_CLASS_NAME.key(), DecodedBootstrapPartitionPathTranslator.class.getName()); + } + switch (bootstrapType) { + case "metadata": + options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), MetadataOnlyBootstrapModeSelector.class.getName()); + break; + case "full": + options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), FullRecordBootstrapModeSelector.class.getName()); + break; + case "mixed": + options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), BootstrapRegexModeSelector.class.getName()); + String regexPattern; + if (dashPartitions) { + regexPattern = "partition_path=2015-03-1[5-7]"; + } else { + regexPattern = "partition_path=2015%2F03%2F1[5-7]"; + } + if (nPartitions > 1) { + regexPattern = regexPattern + "\\/.*"; + } + options.put(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_PATTERN.key(), regexPattern); + break; + default: + throw new RuntimeException(); + } + return options; + } + + protected void doUpdate(Map options, String instantTime) { + Dataset updates = generateTestUpdates(instantTime, nUpdates); + doUpsert(options, updates); + } + + protected void doInsert(Map options, String instantTime) { + Dataset inserts = generateTestInserts(instantTime, nUpdates); + doUpsert(options, inserts); + } + + protected void doDelete(Map options, String instantTime) { + Dataset deletes = generateTestDeletes(instantTime, nUpdates); + doUpsert(options, deletes); + } + + protected void doUpsert(Map options, Dataset df) { + String nCompactCommits = "3"; + df.write().format("hudi") + .options(options) + .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), nCompactCommits) + .mode(SaveMode.Append) + .save(hudiBasePath); + if (bootstrapType.equals("mixed")) { + // mixed tables have a commit for each of the metadata and full bootstrap modes + // so to align with the regular hudi table, we need to compact after 4 commits instead of 3 + nCompactCommits = "4"; + } + df.write().format("hudi") + .options(options) + .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), nCompactCommits) + .mode(SaveMode.Append) + .save(bootstrapTargetPath); + } + + protected void compareTables() { + Dataset hudiDf = sparkSession.read().format("hudi").load(hudiBasePath); + Dataset bootstrapDf = sparkSession.read().format("hudi").load(bootstrapTargetPath); + Dataset fastBootstrapDf = sparkSession.read().format("hudi").option(DATA_QUERIES_ONLY.key(), "true").load(bootstrapTargetPath); + boolean shouldTestFastBootstrap = tableType.equals(COPY_ON_WRITE) && !Boolean.parseBoolean(USE_NEW_HUDI_PARQUET_FILE_FORMAT().defaultValue()); + if (nPartitions == 0) { + compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns)); + if (shouldTestFastBootstrap) { + compareDf(fastBootstrapDf.drop("city_to_state"), bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path")); + } + return; + } + compareDf(hudiDf.drop(dropColumns).drop(partitionCols), bootstrapDf.drop(dropColumns).drop(partitionCols)); + compareDf(hudiDf.select("_row_key",partitionCols), bootstrapDf.select("_row_key",partitionCols)); + if (shouldTestFastBootstrap) { + compareDf(fastBootstrapDf.drop("city_to_state").drop(partitionCols), bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path").drop(partitionCols)); + compareDf(fastBootstrapDf.select("_row_key",partitionCols), bootstrapDf.select("_row_key",partitionCols)); + } + } + + protected void verifyMetaColOnlyRead(Integer iteration) { + Dataset hudiDf = sparkSession.read().format("hudi").load(hudiBasePath).select("_hoodie_commit_time", "_hoodie_record_key"); + Dataset bootstrapDf = sparkSession.read().format("hudi").load(bootstrapTargetPath).select("_hoodie_commit_time", "_hoodie_record_key"); + hudiDf.show(100,false); + bootstrapDf.show(100,false); + if (iteration > 0) { + assertEquals(sparkSession.sql("select * from hudi_iteration_" + (iteration - 1)).intersect(hudiDf).count(), + sparkSession.sql("select * from bootstrap_iteration_" + (iteration - 1)).intersect(bootstrapDf).count()); + } + hudiDf.createOrReplaceTempView("hudi_iteration_" + iteration); + bootstrapDf.createOrReplaceTempView("bootstrap_iteration_" + iteration); + } + + protected void compareDf(Dataset df1, Dataset df2) { + assertEquals(0, df1.except(df2).count()); + assertEquals(0, df2.except(df1).count()); + } + + protected void setupDirs() { + dataGen = new HoodieTestDataGenerator(dashPartitions ? dashPartitionPaths : slashPartitionPaths); + Dataset inserts = generateTestInserts("000", nInserts); + if (dashPartitions) { + //test adding a partition to the table + inserts = inserts.filter("partition_path != '2016-03-14'"); + } + if (nPartitions > 0) { + partitionCols = new String[nPartitions]; + partitionCols[0] = "partition_path"; + for (int i = 1; i < partitionCols.length; i++) { + partitionCols[i] = "partpath" + (i + 1); + } + inserts.write().partitionBy(partitionCols).save(bootstrapBasePath); + } else { + inserts.write().save(bootstrapBasePath); + } + + inserts.write().format("hudi") + .options(basicOptions()) + .mode(SaveMode.Overwrite) + .save(hudiBasePath); + } + + protected Dataset makeDeleteDf(String instantTime, Integer n) { + List records = dataGen.generateUniqueDeleteRecords(instantTime, n).stream() + .map(r -> recordToString(r).get()).collect(Collectors.toList()); + JavaRDD rdd = jsc.parallelize(records); + return sparkSession.read().json(rdd); + } + + protected Dataset generateTestDeletes(String instantTime, Integer n) { + return addPartitionColumns(makeDeleteDf(instantTime, n), nPartitions); + } + + protected Dataset makeInsertDf(String instantTime, Integer n) { + List records = dataGen.generateInserts(instantTime, n).stream() + .map(r -> recordToString(r).get()).collect(Collectors.toList()); + JavaRDD rdd = jsc.parallelize(records); + return sparkSession.read().json(rdd); + } + + protected Dataset generateTestInserts(String instantTime, Integer n) { + return addPartitionColumns(makeInsertDf(instantTime, n), nPartitions); + } + + protected Dataset makeUpdateDf(String instantTime, Integer n) { + try { + List records = dataGen.generateUpdates(instantTime, n).stream() + .map(r -> recordToString(r).get()).collect(Collectors.toList()); + JavaRDD rdd = jsc.parallelize(records); + return sparkSession.read().json(rdd); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + protected Dataset generateTestUpdates(String instantTime, Integer n) { + return addPartitionColumns(makeUpdateDf(instantTime, n), nPartitions); + } + + protected static Dataset addPartitionColumns(Dataset df, Integer nPartitions) { + if (nPartitions < 2) { + return df; + } + for (int i = 2; i <= nPartitions; i++) { + df = applyPartition(df, i); + } + return df; + } + + protected static Dataset applyPartition(Dataset df, Integer n) { + return df.withColumn("partpath" + n, + functions.md5(functions.concat_ws("," + n + ",", + df.col("partition_path"), + functions.hash(df.col("_row_key")).mod(n)))); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java new file mode 100644 index 0000000000000..ef6814f21c5c2 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional; + +import org.apache.hudi.DataSourceReadOptions; +import org.apache.hudi.common.model.HoodieTableType; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; +import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("functional") +public class TestNewHoodieParquetFileFormat extends TestBootstrapReadBase { + + private static Stream testArgs() { + Stream.Builder b = Stream.builder(); + HoodieTableType[] tableType = {COPY_ON_WRITE, MERGE_ON_READ}; + Integer[] nPartitions = {0, 1, 2}; + for (HoodieTableType tt : tableType) { + for (Integer n : nPartitions) { + b.add(Arguments.of(tt, n)); + } + } + return b.build(); + } + + @ParameterizedTest + @MethodSource("testArgs") + public void runTests(HoodieTableType tableType, Integer nPartitions) { + this.bootstrapType = nPartitions == 0 ? "metadata" : "mixed"; + this.dashPartitions = true; + this.tableType = tableType; + this.nPartitions = nPartitions; + setupDirs(); + + //do bootstrap + Map options = setBootstrapOptions(); + Dataset bootstrapDf = sparkSession.emptyDataFrame(); + bootstrapDf.write().format("hudi") + .options(options) + .mode(SaveMode.Overwrite) + .save(bootstrapTargetPath); + runComparisons(); + + options = basicOptions(); + doUpdate(options, "001"); + runComparisons(); + + doInsert(options, "002"); + runComparisons(); + + doDelete(options, "003"); + runComparisons(); + } + + protected void runComparisons() { + if (tableType.equals(MERGE_ON_READ)) { + runComparison(hudiBasePath); + } + runComparison(bootstrapTargetPath); + } + + protected void runComparison(String tableBasePath) { + testCount(tableBasePath); + runIndividualComparison(tableBasePath); + runIndividualComparison(tableBasePath, "partition_path"); + runIndividualComparison(tableBasePath, "_hoodie_record_key", "_hoodie_commit_time", "_hoodie_partition_path"); + runIndividualComparison(tableBasePath, "_hoodie_commit_time", "_hoodie_commit_seqno"); + runIndividualComparison(tableBasePath, "_hoodie_commit_time", "_hoodie_commit_seqno", "partition_path"); + runIndividualComparison(tableBasePath, "_row_key", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path"); + runIndividualComparison(tableBasePath, "_row_key", "partition_path", "_hoodie_is_deleted", "begin_lon"); + } + + protected void testCount(String tableBasePath) { + Dataset legacyDf = sparkSession.read().format("hudi") + .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "false").load(tableBasePath); + Dataset fileFormatDf = sparkSession.read().format("hudi") + .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "true").load(tableBasePath); + assertEquals(legacyDf.count(), fileFormatDf.count()); + } + + protected scala.collection.Seq seq(String... a) { + return scala.collection.JavaConverters.asScalaBufferConverter(Arrays.asList(a)).asScala().toSeq(); + } + + protected void runIndividualComparison(String tableBasePath) { + runIndividualComparison(tableBasePath, ""); + } + + protected void runIndividualComparison(String tableBasePath, String firstColumn, String... columns) { + Dataset legacyDf = sparkSession.read().format("hudi") + .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "false").load(tableBasePath); + Dataset fileFormatDf = sparkSession.read().format("hudi") + .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "true").load(tableBasePath); + if (firstColumn.isEmpty()) { + //df.except(df) does not work with map type cols + legacyDf = legacyDf.drop("city_to_state"); + fileFormatDf = fileFormatDf.drop("city_to_state"); + } else { + if (columns.length > 0) { + legacyDf = legacyDf.select(firstColumn, columns); + fileFormatDf = fileFormatDf.select(firstColumn, columns); + } else { + legacyDf = legacyDf.select(firstColumn); + fileFormatDf = fileFormatDf.select(firstColumn); + } + } + compareDf(legacyDf, fileFormatDf); + } +} diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala index 9f551a0bf1d92..cdb4c5226a696 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala @@ -17,13 +17,17 @@ package org.apache.spark.sql +import org.apache.hudi.SparkHoodieTableFileIndex import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan, MergeIntoTable} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan, MergeIntoTable, Project} import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, ExplainCommand} +import org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.internal.SQLConf object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils { @@ -84,4 +88,14 @@ object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils { override def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, condition: Option[Expression], hint: String): LogicalPlan = { Join(left, right, joinType, condition) } + + override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = { + plan match { + case p@PhysicalOperation(_, _, + l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && !fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected => + fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = true + Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, fs.sparkSession.sessionState.analyzer.resolver), p) + case _ => plan + } + } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index cc2e25f989149..ec275a1d3fdc2 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -33,13 +33,14 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, Join, LogicalPlan} import org.apache.spark.sql.catalyst.util.DateFormatter import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24LegacyHoodieParquetFileFormat} import org.apache.spark.sql.execution.vectorized.MutableColumnarRow import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser import org.apache.spark.sql.parser.HoodieExtendedParserInterface import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel._ @@ -143,8 +144,8 @@ class Spark2Adapter extends SparkAdapter { partitions.toSeq } - override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { - Some(new Spark24HoodieParquetFileFormat(appendPartitionValues)) + override def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { + Some(new Spark24LegacyHoodieParquetFileFormat(appendPartitionValues)) } override def createInterpretedPredicate(e: Expression): InterpretedPredicate = { @@ -200,4 +201,10 @@ class Spark2Adapter extends SparkAdapter { DataSourceStrategy.translateFilter(predicate) } + + override def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int): ColumnarBatch = { + val batch = new ColumnarBatch(vectors) + batch.setNumRows(numRows) + batch + } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24LegacyHoodieParquetFileFormat.scala similarity index 99% rename from hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala rename to hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24LegacyHoodieParquetFileFormat.scala index c168911302eef..a6a4ab943a1d9 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24LegacyHoodieParquetFileFormat.scala @@ -50,7 +50,7 @@ import java.net.URI *
  • Avoiding appending partition values to the rows read from the data file
  • * */ -class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { +class Spark24LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { override def buildReaderWithPartitionValues(sparkSession: SparkSession, dataSchema: StructType, diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index ce9499ae7d2a9..b2a9a529511ec 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SQLContext, SparkSession} import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} import org.apache.spark.storage.StorageLevel import java.time.ZoneId @@ -97,4 +98,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { supportNestedPredicatePushdown: Boolean = false): Option[Filter] = { DataSourceStrategy.translateFilter(predicate, supportNestedPredicatePushdown) } + + override def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int): ColumnarBatch = { + new ColumnarBatch(vectors, numRows) + } } diff --git a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala index aefc6af7e3e7e..e9757c821d988 100644 --- a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala @@ -18,12 +18,16 @@ package org.apache.spark.sql +import org.apache.hudi.SparkHoodieTableFileIndex import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.ResolvedTable import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable} +import org.apache.spark.sql.catalyst.planning.ScanOperation +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, Project} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand +import org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.types.StructType object HoodieSpark30CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { @@ -42,6 +46,16 @@ object HoodieSpark30CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { } } + override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = { + plan match { + case s@ScanOperation(_, _, + l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && !fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected => + fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = true + Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, fs.sparkSession.sessionState.analyzer.resolver), s) + case _ => plan + } + } + override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema = ProjectionOverSchema(schema) override def isRepairTable(plan: LogicalPlan): Boolean = { diff --git a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala index 5abd46948eb9a..22a9f090fb33e 100644 --- a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark30HoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark30LegacyHoodieParquetFileFormat} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, HoodieSpark30PartitionedFileUtils, HoodieSparkPartitionedFileUtils, PartitionedFile} import org.apache.spark.sql.hudi.SparkAdapter @@ -84,8 +84,8 @@ class Spark3_0Adapter extends BaseSpark3Adapter { override def createExtendedSparkParser(spark: SparkSession, delegate: ParserInterface): HoodieExtendedParserInterface = new HoodieSpark3_0ExtendedSqlParser(spark, delegate) - override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { - Some(new Spark30HoodieParquetFileFormat(appendPartitionValues)) + override def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { + Some(new Spark30LegacyHoodieParquetFileFormat(appendPartitionValues)) } override def createHoodieFileScanRDD(sparkSession: SparkSession, diff --git a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala similarity index 98% rename from hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30HoodieParquetFileFormat.scala rename to hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala index 4c33ac896770f..de0be0db04b3b 100644 --- a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.parquet.Spark30HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet} +import org.apache.spark.sql.execution.datasources.parquet.Spark30LegacyHoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet} import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -61,7 +61,7 @@ import java.net.URI *
  • Schema on-read
  • * */ -class Spark30HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { +class Spark30LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { override def buildReaderWithPartitionValues(sparkSession: SparkSession, dataSchema: StructType, @@ -336,7 +336,7 @@ class Spark30HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo } } -object Spark30HoodieParquetFileFormat { +object Spark30LegacyHoodieParquetFileFormat { def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala index 2fcc88d4b3725..df94529ce12b4 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala @@ -18,12 +18,16 @@ package org.apache.spark.sql +import org.apache.hudi.SparkHoodieTableFileIndex import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.ResolvedTable import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable} +import org.apache.spark.sql.catalyst.planning.ScanOperation +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, Project} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand +import org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.types.StructType object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { @@ -42,6 +46,16 @@ object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { } } + override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = { + plan match { + case s@ScanOperation(_, _, + l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && !fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected => + fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = true + Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, fs.sparkSession.sessionState.analyzer.resolver), s) + case _ => plan + } + } + override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema = ProjectionOverSchema(schema) override def isRepairTable(plan: LogicalPlan): Boolean = { diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala index 547309c0e1096..8ca072333d0e3 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31HoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31LegacyHoodieParquetFileFormat} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, HoodieSpark31PartitionedFileUtils, HoodieSparkPartitionedFileUtils, PartitionedFile} import org.apache.spark.sql.hudi.SparkAdapter @@ -85,8 +85,8 @@ class Spark3_1Adapter extends BaseSpark3Adapter { override def createExtendedSparkParser(spark: SparkSession, delegate: ParserInterface): HoodieExtendedParserInterface = new HoodieSpark3_1ExtendedSqlParser(spark, delegate) - override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { - Some(new Spark31HoodieParquetFileFormat(appendPartitionValues)) + override def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { + Some(new Spark31LegacyHoodieParquetFileFormat(appendPartitionValues)) } override def createHoodieFileScanRDD(sparkSession: SparkSession, diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala similarity index 98% rename from hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala rename to hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala index a90d36a02de77..2d84400750683 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.parquet.Spark31HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet} +import org.apache.spark.sql.execution.datasources.parquet.Spark31LegacyHoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet} import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -61,7 +61,7 @@ import java.net.URI *
  • Schema on-read
  • * */ -class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { +class Spark31LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { override def buildReaderWithPartitionValues(sparkSession: SparkSession, dataSchema: StructType, @@ -343,7 +343,7 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo } } -object Spark31HoodieParquetFileFormat { +object Spark31LegacyHoodieParquetFileFormat { def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala index f76128d4f81b8..d4624625d7537 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala @@ -19,14 +19,18 @@ package org.apache.spark.sql import org.apache.hudi.HoodieSparkUtils +import org.apache.hudi.SparkHoodieTableFileIndex import org.apache.hudi.common.util.ValidationUtils.checkArgument import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt import org.apache.spark.sql.catalyst.analysis.ResolvedTable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable} +import org.apache.spark.sql.catalyst.planning.ScanOperation +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, Project} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.command.RepairTableCommand +import org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.types.StructType object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { @@ -45,6 +49,16 @@ object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { } } + override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = { + plan match { + case s@ScanOperation(_, _, + l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && !fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected => + fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = true + Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, fs.sparkSession.sessionState.analyzer.resolver), s) + case _ => plan + } + } + override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema = { val klass = classOf[ProjectionOverSchema] checkArgument(klass.getConstructors.length == 1) diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala index f07d0ccdc631d..3a5812a5faa40 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala @@ -30,9 +30,9 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, LogicalPlan} import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32HoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32LegacyHoodieParquetFileFormat} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, HoodieSpark32PartitionedFileUtils, HoodieSparkPartitionedFileUtils, PartitionedFile} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hudi.analysis.TableValuedFunctions import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark3_2ExtendedSqlParser} import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} @@ -84,8 +84,8 @@ class Spark3_2Adapter extends BaseSpark3Adapter { override def createExtendedSparkParser(spark: SparkSession, delegate: ParserInterface): HoodieExtendedParserInterface = new HoodieSpark3_2ExtendedSqlParser(spark, delegate) - override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { - Some(new Spark32HoodieParquetFileFormat(appendPartitionValues)) + override def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { + Some(new Spark32LegacyHoodieParquetFileFormat(appendPartitionValues)) } override def createHoodieFileScanRDD(sparkSession: SparkSession, diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala similarity index 98% rename from hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala rename to hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala index f6eb5da13b509..c88c35b5eeb4e 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._ +import org.apache.spark.sql.execution.datasources.parquet.Spark32LegacyHoodieParquetFileFormat._ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -60,7 +60,7 @@ import java.net.URI *
  • Schema on-read
  • * */ -class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { +class Spark32LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { override def buildReaderWithPartitionValues(sparkSession: SparkSession, dataSchema: StructType, @@ -172,7 +172,7 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo // and unfortunately won't compile against Spark 3.2.0 // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1 val datetimeRebaseSpec = - DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) new ParquetFilters( parquetSchema, pushDownDate, @@ -271,7 +271,7 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo // and unfortunately won't compile against Spark 3.2.0 // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1 val int96RebaseSpec = - DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) new VectorizedParquetRecordReader( @@ -409,7 +409,7 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo } } -object Spark32HoodieParquetFileFormat { +object Spark32LegacyHoodieParquetFileFormat { /** * NOTE: This method is specific to Spark 3.2.0 @@ -514,4 +514,3 @@ object Spark32HoodieParquetFileFormat { } } } - diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala index a294dcfdf4b1b..16f2517d128ed 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala @@ -18,13 +18,17 @@ package org.apache.spark.sql +import org.apache.hudi.SparkHoodieTableFileIndex import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt import org.apache.spark.sql.catalyst.analysis.ResolvedTable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable} +import org.apache.spark.sql.catalyst.planning.ScanOperation +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, Project} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.command.RepairTableCommand +import org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.types.StructType object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { @@ -43,6 +47,16 @@ object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { } } + override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = { + plan match { + case s@ScanOperation(_, _, + l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && !fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected => + fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = true + Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, fs.sparkSession.sessionState.analyzer.resolver), s) + case _ => plan + } + } + override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema = ProjectionOverSchema(schema, output) diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala index 651027f932d46..e3d2cc9cd185e 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala @@ -30,9 +30,9 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark33HoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark33LegacyHoodieParquetFileFormat} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, HoodieSpark33PartitionedFileUtils, HoodieSparkPartitionedFileUtils, PartitionedFile} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hudi.analysis.TableValuedFunctions import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark3_3ExtendedSqlParser} import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} @@ -85,8 +85,8 @@ class Spark3_3Adapter extends BaseSpark3Adapter { override def createExtendedSparkParser(spark: SparkSession, delegate: ParserInterface): HoodieExtendedParserInterface = new HoodieSpark3_3ExtendedSqlParser(spark, delegate) - override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { - Some(new Spark33HoodieParquetFileFormat(appendPartitionValues)) + override def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { + Some(new Spark33LegacyHoodieParquetFileFormat(appendPartitionValues)) } override def createHoodieFileScanRDD(sparkSession: SparkSession, diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala similarity index 99% rename from hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33HoodieParquetFileFormat.scala rename to hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala index 1e60f2ae96886..de6cbff90ca54 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.parquet.Spark33HoodieParquetFileFormat._ +import org.apache.spark.sql.execution.datasources.parquet.Spark33LegacyHoodieParquetFileFormat._ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -62,7 +62,7 @@ import java.net.URI *
  • Schema on-read
  • * */ -class Spark33HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { +class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { override def buildReaderWithPartitionValues(sparkSession: SparkSession, dataSchema: StructType, @@ -411,7 +411,7 @@ class Spark33HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo } } -object Spark33HoodieParquetFileFormat { +object Spark33LegacyHoodieParquetFileFormat { /** * NOTE: This method is specific to Spark 3.2.0 diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala index 302fba45590fc..947a73285f5b9 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala @@ -18,12 +18,16 @@ package org.apache.spark.sql +import org.apache.hudi.SparkHoodieTableFileIndex import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, ResolvedTable} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable} +import org.apache.spark.sql.catalyst.planning.ScanOperation +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, Project} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.command.RepairTableCommand +import org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.types.StructType object HoodieSpark34CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { @@ -42,6 +46,16 @@ object HoodieSpark34CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { } } + override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = { + plan match { + case s@ScanOperation(_, _, _, + l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && !fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected => + fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = true + Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, fs.sparkSession.sessionState.analyzer.resolver), s) + case _ => plan + } + } + override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema = ProjectionOverSchema(schema, output) diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala index 3bd8256c3aadf..0ae5ef3dbf34a 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala @@ -29,14 +29,14 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark34HoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark34LegacyHoodieParquetFileFormat} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, HoodieSpark34PartitionedFileUtils, HoodieSparkPartitionedFileUtils, PartitionedFile} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hudi.analysis.TableValuedFunctions import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark3_4ExtendedSqlParser} import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} import org.apache.spark.sql.vectorized.ColumnarBatchRow -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSchemaUtils, HoodieSpark34CatalogUtils, HoodieSpark34CatalystExpressionUtils, HoodieSpark34CatalystPlanUtils, HoodieSpark34SchemaUtils, HoodieSpark3CatalogUtils, SparkSession, SparkSessionExtensions} +import org.apache.spark.sql._ import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel._ @@ -85,8 +85,8 @@ class Spark3_4Adapter extends BaseSpark3Adapter { override def createExtendedSparkParser(spark: SparkSession, delegate: ParserInterface): HoodieExtendedParserInterface = new HoodieSpark3_4ExtendedSqlParser(spark, delegate) - override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { - Some(new Spark34HoodieParquetFileFormat(appendPartitionValues)) + override def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { + Some(new Spark34LegacyHoodieParquetFileFormat(appendPartitionValues)) } override def createHoodieFileScanRDD(sparkSession: SparkSession, diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala similarity index 99% rename from hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34HoodieParquetFileFormat.scala rename to hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala index 4bd3e2fc3458d..6de8ded06ec00 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.WholeStageCodegenExec -import org.apache.spark.sql.execution.datasources.parquet.Spark34HoodieParquetFileFormat._ +import org.apache.spark.sql.execution.datasources.parquet.Spark34LegacyHoodieParquetFileFormat._ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -57,7 +57,7 @@ import org.apache.spark.util.SerializableConfiguration *
  • Schema on-read
  • * */ -class Spark34HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { +class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { val conf = sparkSession.sessionState.conf @@ -427,7 +427,7 @@ class Spark34HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo } } -object Spark34HoodieParquetFileFormat { +object Spark34LegacyHoodieParquetFileFormat { /** * NOTE: This method is specific to Spark 3.2.0 diff --git a/rfc/README.md b/rfc/README.md index 544e1c83de933..0c5475233de33 100644 --- a/rfc/README.md +++ b/rfc/README.md @@ -107,4 +107,4 @@ The list of all RFCs can be found here. | 69 | [Hudi 1.x](./rfc-69/rfc-69.md) | `UNDER REVIEW` | | 70 | [Hudi Reverse Streamer](./rfc/rfc-70/rfc-70.md) | `UNDER REVIEW` | | 71 | [Enhance OCC conflict detection](./rfc/rfc-71/rfc-71.md) | `UNDER REVIEW` | -| 72 | [Redesign Hudi-Spark Integration](./rfc/rfc-72/rfc-72.md) | `IN PROGRESS` | \ No newline at end of file +| 72 | [Redesign Hudi-Spark Integration](./rfc/rfc-72/rfc-72.md) | `ONGOING` | \ No newline at end of file