diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index cd01e4fd5a065..cd30528798d66 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -45,7 +45,7 @@ trait SparkAdapter extends Serializable { * Creates instance of [[HoodieCatalystExpressionUtils]] providing for common utils operating * on Catalyst [[Expression]]s */ - def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils + def getCatalystExpressionUtils: HoodieCatalystExpressionUtils /** * Creates instance of [[HoodieCatalystPlansUtils]] providing for common utils operating diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index b7033c3bfc31c..416e91800f71a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -20,11 +20,13 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hudi.HoodieBaseRelation.projectReader import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.hadoop.HoodieROTablePathFilter +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.datasources import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType @@ -68,17 +70,18 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, } protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit], - partitionSchema: StructType, - dataSchema: HoodieTableSchema, + tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, requestedColumns: Array[String], - filters: Array[Filter]): HoodieUnsafeRDD = { + filters: Array[Filter]): RDD[InternalRow] = { + val (partitionSchema, dataSchema, requiredDataSchema) = + tryPrunePartitionColumns(tableSchema, requiredSchema) val baseFileReader = createBaseFileReader( spark = sparkSession, partitionSchema = partitionSchema, dataSchema = dataSchema, - requiredSchema = requiredSchema, + requiredDataSchema = requiredDataSchema, filters = filters, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it @@ -86,7 +89,15 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema) ) - new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits) + // NOTE: In some case schema of the reader's output (reader's schema) might not match the schema expected by the caller. + // This could occur for ex, when requested schema contains partition columns which might not be persisted w/in the + // data file, but instead would be parsed from the partition path. In that case output of the file-reader will have + // different ordering of the fields than the original required schema (for more details please check out + // [[ParquetFileFormat]] impl). In that case we have to project the rows from the file-reader's schema + // back into the one expected by the caller + val projectedReader = projectReader(baseFileReader, requiredSchema.structTypeSchema) + + new HoodieFileScanRDD(sparkSession, projectedReader.apply, fileSplits) } protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index ff6515db325ac..5274f257e15b2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -45,7 +45,7 @@ import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection} import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat} @@ -274,7 +274,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, def canPruneRelationSchema: Boolean = (fileFormat.isInstanceOf[ParquetFileFormat] || fileFormat.isInstanceOf[OrcFileFormat]) && // NOTE: Some relations might be disabling sophisticated schema pruning techniques (for ex, nested schema pruning) - // TODO(HUDI-XXX) internal schema doesn't supported nested schema pruning currently + // TODO(HUDI-XXX) internal schema doesn't support nested schema pruning currently !hasSchemaOnRead override def schema: StructType = { @@ -334,38 +334,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt) val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema)) - // Since schema requested by the caller might contain partition columns, we might need to - // prune it, removing all partition columns from it in case these columns are not persisted - // in the data files - // - // NOTE: This partition schema is only relevant to file reader to be able to embed - // values of partition columns (hereafter referred to as partition values) encoded into - // the partition path, and omitted from the data file, back into fetched rows; - // Note that, by default, partition columns are not omitted therefore specifying - // partition schema for reader is not required - val (partitionSchema, dataSchema, requiredDataSchema) = - tryPrunePartitionColumns(tableSchema, requiredSchema) - if (fileSplits.isEmpty) { sparkSession.sparkContext.emptyRDD } else { - val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, targetColumns, filters) - - // NOTE: In case when partition columns have been pruned from the required schema, we have to project - // the rows from the pruned schema back into the one expected by the caller - val projectedRDD = if (requiredDataSchema.structTypeSchema != requiredSchema.structTypeSchema) { - rdd.mapPartitions { it => - val fullPrunedSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields) - val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema) - it.map(unsafeProjection) - } - } else { - rdd - } + val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, targetColumns, filters) // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] // Please check [[needConversion]] scala-doc for more details - projectedRDD.asInstanceOf[RDD[Row]] + rdd.asInstanceOf[RDD[Row]] } } @@ -373,19 +349,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied * * @param fileSplits file splits to be handled by the RDD - * @param partitionSchema target table's partition schema - * @param dataSchema target table's data files' schema + * @param tableSchema target table's schema * @param requiredSchema projected schema required by the reader * @param requestedColumns columns requested by the query * @param filters data filters to be applied - * @return instance of RDD (implementing [[HoodieUnsafeRDD]]) + * @return instance of RDD (holding [[InternalRow]]s) */ protected def composeRDD(fileSplits: Seq[FileSplit], - partitionSchema: StructType, - dataSchema: HoodieTableSchema, + tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, requestedColumns: Array[String], - filters: Array[Filter]): HoodieUnsafeRDD + filters: Array[Filter]): RDD[InternalRow] /** * Provided with partition and date filters collects target file splits to read records from, while @@ -553,7 +527,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def createBaseFileReader(spark: SparkSession, partitionSchema: StructType, dataSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, + requiredDataSchema: HoodieTableSchema, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): BaseFileReader = { @@ -564,42 +538,56 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // we have to eagerly initialize all of the readers even though only one specific to the type // of the file being read will be used. This is required to avoid serialization of the whole // relation (containing file-index for ex) and passing it to the executor - val reader = tableBaseFileFormat match { - case HoodieFileFormat.PARQUET => - HoodieDataSourceHelper.buildHoodieParquetReader( - sparkSession = spark, - dataSchema = dataSchema.structTypeSchema, - partitionSchema = partitionSchema, - requiredSchema = requiredSchema.structTypeSchema, - filters = filters, - options = options, - hadoopConf = hadoopConf, - // We're delegating to Spark to append partition values to every row only in cases - // when these corresponding partition-values are not persisted w/in the data file itself - appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath - ) + val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType) = + tableBaseFileFormat match { + case HoodieFileFormat.PARQUET => + val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( + sparkSession = spark, + dataSchema = dataSchema.structTypeSchema, + partitionSchema = partitionSchema, + requiredSchema = requiredDataSchema.structTypeSchema, + filters = filters, + options = options, + hadoopConf = hadoopConf, + // We're delegating to Spark to append partition values to every row only in cases + // when these corresponding partition-values are not persisted w/in the data file itself + appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath + ) + // Since partition values by default are omitted, and not persisted w/in data-files by Spark, + // data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading + // the data. As such, actual full schema produced by such reader is composed of + // a) Data-file schema (projected or not) + // b) Appended partition column values + val readerSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields) + + (parquetReader, readerSchema) case HoodieFileFormat.HFILE => - createHFileReader( + val hfileReader = createHFileReader( spark = spark, dataSchema = dataSchema, - requiredSchema = requiredSchema, + requiredDataSchema = requiredDataSchema, filters = filters, options = options, hadoopConf = hadoopConf ) + (hfileReader, requiredDataSchema.structTypeSchema) + case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)") } - partitionedFile => { - val extension = FSUtils.getFileExtension(partitionedFile.filePath) - if (tableBaseFileFormat.getFileExtension.equals(extension)) { - reader.apply(partitionedFile) - } else { - throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)") - } - } + BaseFileReader( + read = partitionedFile => { + val extension = FSUtils.getFileExtension(partitionedFile.filePath) + if (tableBaseFileFormat.getFileExtension.equals(extension)) { + read(partitionedFile) + } else { + throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)") + } + }, + schema = schema + ) } protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: Option[InternalSchema]): Configuration = { @@ -615,8 +603,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, conf } - private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = { + protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = { + // Since schema requested by the caller might contain partition columns, we might need to + // prune it, removing all partition columns from it in case these columns are not persisted + // in the data files + // + // NOTE: This partition schema is only relevant to file reader to be able to embed + // values of partition columns (hereafter referred to as partition values) encoded into + // the partition path, and omitted from the data file, back into fetched rows; + // Note that, by default, partition columns are not omitted therefore specifying + // partition schema for reader is not required if (shouldExtractPartitionValuesFromPartitionPath) { val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType))) val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema) @@ -645,10 +642,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, object HoodieBaseRelation extends SparkAdapterSupport { - type BaseFileReader = PartitionedFile => Iterator[InternalRow] + case class BaseFileReader(read: PartitionedFile => Iterator[InternalRow], val schema: StructType) { + def apply(file: PartitionedFile): Iterator[InternalRow] = read.apply(file) + } - private def generateUnsafeProjection(from: StructType, to: StructType) = - sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from, to) + def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = + sparkAdapter.getCatalystExpressionUtils.generateUnsafeProjection(from, to) def convertToAvroSchema(structSchema: StructType): Schema = sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record") @@ -656,6 +655,32 @@ object HoodieBaseRelation extends SparkAdapterSupport { def getPartitionPath(fileStatus: FileStatus): Path = fileStatus.getPath.getParent + /** + * Projects provided file reader's output from its original schema, into a [[requiredSchema]] + * + * NOTE: [[requiredSchema]] has to be a proper subset of the file reader's schema + * + * @param reader file reader to be projected + * @param requiredSchema target schema for the output of the provided file reader + */ + def projectReader(reader: BaseFileReader, requiredSchema: StructType): BaseFileReader = { + checkState(reader.schema.fields.toSet.intersect(requiredSchema.fields.toSet).size == requiredSchema.size) + + if (reader.schema == requiredSchema) { + reader + } else { + val read = reader.apply(_) + val projectedRead: PartitionedFile => Iterator[InternalRow] = (file: PartitionedFile) => { + // NOTE: Projection is not a serializable object, hence it creation should only happen w/in + // the executor process + val unsafeProjection = generateUnsafeProjection(reader.schema, requiredSchema) + read(file).map(unsafeProjection) + } + + BaseFileReader(projectedRead, requiredSchema) + } + } + /** * Projects provided schema by picking only required (projected) top-level columns from it * @@ -666,7 +691,6 @@ object HoodieBaseRelation extends SparkAdapterSupport { tableSchema match { case Right(internalSchema) => checkState(!internalSchema.isEmptySchema) - // TODO extend pruning to leverage optimizer pruned schema val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, requiredColumns.toList.asJava) val requiredAvroSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, "schema") val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema) @@ -691,10 +715,10 @@ object HoodieBaseRelation extends SparkAdapterSupport { private def createHFileReader(spark: SparkSession, dataSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, + requiredDataSchema: HoodieTableSchema, filters: Seq[Filter], options: Map[String, String], - hadoopConf: Configuration): BaseFileReader = { + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) @@ -703,10 +727,10 @@ object HoodieBaseRelation extends SparkAdapterSupport { val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath), new CacheConfig(hadoopConf)) - val requiredRowSchema = requiredSchema.structTypeSchema + val requiredRowSchema = requiredDataSchema.structTypeSchema // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] aren't serializable // to be passed from driver to executor - val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) + val requiredAvroSchema = new Schema.Parser().parse(requiredDataSchema.avroSchemaStr) val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema) reader.getRecordIterator(requiredAvroSchema).asScala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala index 6c721723c50a3..8bd295c7f3db4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -41,15 +41,17 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport { /** * Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]] handling [[ColumnarBatch]], * when Parquet's Vectorized Reader is used + * + * TODO move to HoodieBaseRelation, make private */ - def buildHoodieParquetReader(sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration, - appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = { + private[hudi] def buildHoodieParquetReader(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration, + appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = { val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues( sparkSession = sparkSession, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala index a176626f76421..4b7a09795a2e1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala @@ -25,9 +25,9 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit class HoodieFileScanRDD(@transient private val sparkSession: SparkSession, - readFunction: PartitionedFile => Iterator[InternalRow], + read: PartitionedFile => Iterator[InternalRow], @transient fileSplits: Seq[HoodieBaseFileSplit]) - extends FileScanRDD(sparkSession, readFunction, fileSplits.map(_.filePartition)) + extends FileScanRDD(sparkSession, read, fileSplits.map(_.filePartition)) with HoodieUnsafeRDD { override final def collect(): Array[InternalRow] = super[HoodieUnsafeRDD].collect() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index c4c70cb414e32..512c97806f31d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -23,9 +23,10 @@ import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedReco import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf -import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection, projectReader} import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption} -import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability} +import org.apache.hudi.HoodieMergeOnReadRDD.SafeAvroProjection.collectFieldOrdinals +import org.apache.hudi.HoodieMergeOnReadRDD._ import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.fs.FSUtils @@ -43,8 +44,6 @@ import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata} import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.HoodieAvroDeserializer import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection} -import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.types.StructType import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} @@ -56,14 +55,42 @@ import scala.util.Try case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition -case class HoodieMergeOnReadBaseFileReaders(fullSchemaFileReader: BaseFileReader, - requiredSchemaFileReaderForMerging: BaseFileReader, - requiredSchemaFileReaderForNoMerging: BaseFileReader) +/** + * Class holding base-file readers for 3 different use-cases: + * + *
    + *
  1. Full-schema reader: is used when whole row has to be read to perform merging correctly. + * This could occur, when no optimizations could be applied and we have to fallback to read the whole row from + * the base file and the corresponding delta-log file to merge them correctly
  2. + * + *
  3. Required-schema reader: is used when it's fine to only read row's projected columns. + * This could occur, when row could be merged with corresponding delta-log record leveraging while only having + * projected columns
  4. + * + *
  5. Required-schema reader (skip-merging): is used when when no merging will be performed (skip-merged). + * This could occur, when file-group has no delta-log files
  6. + *
+ */ +private[hudi] case class HoodieMergeOnReadBaseFileReaders(fullSchemaReader: BaseFileReader, + requiredSchemaReader: BaseFileReader, + requiredSchemaReaderSkipMerging: BaseFileReader) +/** + * RDD enabling Hudi's Merge-on-Read (MOR) semantic + * + * @param sc spark's context + * @param config hadoop configuration + * @param fileReaders suite of base file readers + * @param tableSchema table's full schema + * @param requiredSchema expected (potentially) projected schema + * @param tableState table's state + * @param mergeType type of merge performed + * @param fileSplits target file-splits this RDD will be iterating over + */ class HoodieMergeOnReadRDD(@transient sc: SparkContext, @transient config: Configuration, fileReaders: HoodieMergeOnReadBaseFileReaders, - dataSchema: HoodieTableSchema, + tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, tableState: HoodieTableState, mergeType: String, @@ -90,18 +117,19 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition] val iter = mergeOnReadPartition.split match { case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => - fileReaders.requiredSchemaFileReaderForNoMerging.apply(dataFileOnlySplit.dataFile.get) + val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema) + projectedReader(dataFileOnlySplit.dataFile.get) case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => new LogFileIterator(logFileOnlySplit, getConfig) case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => - val baseFileIterator = fileReaders.requiredSchemaFileReaderForNoMerging.apply(split.dataFile.get) - new SkipMergeIterator(split, baseFileIterator, getConfig) + val reader = fileReaders.requiredSchemaReaderSkipMerging + new SkipMergeIterator(split, reader, getConfig) case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => - val (baseFileIterator, schema) = readBaseFile(split) - new RecordMergingFileIterator(split, baseFileIterator, schema, getConfig) + val reader = pickBaseFileReader + new RecordMergingFileIterator(split, reader, getConfig) case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " + s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" + @@ -120,7 +148,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, iter } - private def readBaseFile(split: HoodieMergeOnReadFileSplit): (Iterator[InternalRow], HoodieTableSchema) = { + private def pickBaseFileReader: BaseFileReader = { // NOTE: This is an optimization making sure that even for MOR tables we fetch absolute minimum // of the stored data possible, while still properly executing corresponding relation's semantic // and meet the query's requirements. @@ -129,10 +157,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // a) It does use one of the standard (and whitelisted) Record Payload classes // then we can avoid reading and parsing the records w/ _full_ schema, and instead only // rely on projected one, nevertheless being able to perform merging correctly - if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) - (fileReaders.fullSchemaFileReader(split.dataFile.get), dataSchema) - else - (fileReaders.requiredSchemaFileReaderForMerging(split.dataFile.get), requiredSchema) + if (whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) { + fileReaders.requiredSchemaReader + } else { + fileReaders.fullSchemaReader + } } override protected def getPartitions: Array[Partition] = @@ -156,38 +185,27 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema - protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(dataSchema.avroSchemaStr) + protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr) - protected val recordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema) protected var recordToLoad: InternalRow = _ - // TODO validate whether we need to do UnsafeProjection - protected val unsafeProjection: UnsafeProjection = UnsafeProjection.create(requiredStructTypeSchema) - - // NOTE: This maps _required_ schema fields onto the _full_ table schema, collecting their "ordinals" - // w/in the record payload. This is required, to project records read from the Delta Log file - // which always reads records in full schema (never projected, due to the fact that DL file might - // be stored in non-columnar formats like Avro, HFile, etc) - private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema) + private val requiredSchemaSafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, requiredAvroSchema) private var logScanner = { - val internalSchema = dataSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema) + val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema) HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState, maxCompactionMemoryInBytes, config, internalSchema) } private val logRecords = logScanner.getRecords.asScala - // NOTE: This iterator iterates over already projected (in required schema) records // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's // going to be used, since we modify `logRecords` before that and therefore can't do it any earlier protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] = logRecords.iterator.map { case (_, record) => - val avroRecordOpt = toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps)) - avroRecordOpt.map { - avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, requiredSchemaFieldOrdinals, recordBuilder) - } + toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps)) + .map(_.asInstanceOf[GenericRecord]) } protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] = @@ -205,7 +223,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // Record has been deleted, skipping this.hasNextInternal } else { - recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get)) + val projectedAvroRecord = requiredSchemaSafeAvroProjection(avroRecordOpt.get) + recordToLoad = deserialize(projectedAvroRecord) true } } @@ -229,14 +248,18 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, * performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially) */ private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit, - baseFileIterator: Iterator[InternalRow], + baseFileReader: BaseFileReader, config: Configuration) extends LogFileIterator(split, config) { + private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema) + + private val baseFileIterator = baseFileReader(split.dataFile.get) + override def hasNext: Boolean = { if (baseFileIterator.hasNext) { - val curRow = baseFileIterator.next() - recordToLoad = curRow + // No merge is required, simply load current row and project into required schema + recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next()) true } else { super[LogFileIterator].hasNext @@ -250,8 +273,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, * streams */ private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, - baseFileIterator: Iterator[InternalRow], - baseFileReaderSchema: HoodieTableSchema, + baseFileReader: BaseFileReader, config: Configuration) extends LogFileIterator(split, config) { @@ -260,13 +282,17 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // - Projected schema // As such, no particular schema could be assumed, and therefore we rely on the caller // to correspondingly set the scheme of the expected output of base-file reader - private val baseFileReaderAvroSchema = new Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr) - private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema) + private val baseFileReaderAvroSchema = sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable = false, "record") + + private val serializer = sparkAdapter.createAvroSerializer(baseFileReader.schema, baseFileReaderAvroSchema, nullable = false) - private val serializer = sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema, - baseFileReaderAvroSchema, resolveAvroSchemaNullability(baseFileReaderAvroSchema)) + private val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema) - private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField) + private val recordKeyOrdinal = baseFileReader.schema.fieldIndex(tableState.recordKeyField) + + private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema) + + private val baseFileIterator = baseFileReader(split.dataFile.get) override def hasNext: Boolean = hasNextInternal @@ -275,26 +301,22 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // handling records @tailrec private def hasNextInternal: Boolean = { if (baseFileIterator.hasNext) { - val curRowRecord = baseFileIterator.next() - val curKey = curRowRecord.getString(recordKeyOrdinal) + val curRow = baseFileIterator.next() + val curKey = curRow.getString(recordKeyOrdinal) val updatedRecordOpt = removeLogRecord(curKey) if (updatedRecordOpt.isEmpty) { - // No merge needed, load current row with required projected schema - recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals)) + // No merge is required, simply load current row and project into required schema + recordToLoad = requiredSchemaUnsafeProjection(curRow) true } else { - val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get) + val mergedAvroRecordOpt = merge(serialize(curRow), updatedRecordOpt.get) if (mergedAvroRecordOpt.isEmpty) { // Record has been deleted, skipping this.hasNextInternal } else { - // NOTE: In occurrence of a merge we can't know the schema of the record being returned, b/c - // record from the Delta Log will bear (full) Table schema, while record from the Base file - // might already be read in projected one (as an optimization). - // As such we can't use more performant [[projectAvroUnsafe]], and instead have to fallback - // to [[projectAvro]] - val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, requiredAvroSchema, recordBuilder) - recordToLoad = unsafeProjection(deserialize(projectedAvroRecord)) + val projectedAvroRecord = projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord], + requiredAvroSchema, reusableRecordBuilder) + recordToLoad = deserialize(projectedAvroRecord) true } } @@ -381,66 +403,10 @@ private object HoodieMergeOnReadRDD { } } - /** - * Projects provided instance of [[InternalRow]] into provided schema, assuming that the - * the schema of the original row is strictly a superset of the given one - */ - private def projectRowUnsafe(row: InternalRow, - projectedSchema: StructType, - ordinals: Seq[Int]): InternalRow = { - val projectedRow = new SpecificInternalRow(projectedSchema) - var curIndex = 0 - projectedSchema.zip(ordinals).foreach { case (field, pos) => - val curField = if (row.isNullAt(pos)) { - null - } else { - row.get(pos, field.dataType) - } - projectedRow.update(curIndex, curField) - curIndex += 1 - } - projectedRow - } - - /** - * Projects provided instance of [[IndexedRecord]] into provided schema, assuming that the - * the schema of the original row is strictly a superset of the given one - */ - def projectAvroUnsafe(record: IndexedRecord, - projectedSchema: Schema, - ordinals: List[Int], - recordBuilder: GenericRecordBuilder): GenericRecord = { + private def projectAvroUnsafe(record: GenericRecord, projectedSchema: Schema, reusableRecordBuilder: GenericRecordBuilder): GenericRecord = { val fields = projectedSchema.getFields.asScala - checkState(fields.length == ordinals.length) - fields.zip(ordinals).foreach { - case (field, pos) => recordBuilder.set(field, record.get(pos)) - } - recordBuilder.build() - } - - /** - * Projects provided instance of [[IndexedRecord]] into provided schema, assuming that the - * the schema of the original row is strictly a superset of the given one - * - * This is a "safe" counterpart of [[projectAvroUnsafe]]: it does build mapping of the record's - * schema into projected one itself (instead of expecting such mapping from the caller) - */ - def projectAvro(record: IndexedRecord, - projectedSchema: Schema, - recordBuilder: GenericRecordBuilder): GenericRecord = { - projectAvroUnsafe(record, projectedSchema, collectFieldOrdinals(projectedSchema, record.getSchema), recordBuilder) - } - - /** - * Maps [[projected]] [[Schema]] onto [[source]] one, collecting corresponding field ordinals w/in it, which - * will be subsequently used by either [[projectRowUnsafe]] or [[projectAvroUnsafe()]] method - * - * @param projected target projected schema (which is a proper subset of [[source]] [[Schema]]) - * @param source source schema of the record being projected - * @return list of ordinals of corresponding fields of [[projected]] schema w/in [[source]] one - */ - private def collectFieldOrdinals(projected: Schema, source: Schema): List[Int] = { - projected.getFields.asScala.map(f => source.getField(f.name()).pos()).toList + fields.foreach(field => reusableRecordBuilder.set(field, record.get(field.name()))) + reusableRecordBuilder.build() } private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = { @@ -452,9 +418,48 @@ private object HoodieMergeOnReadRDD { .getParent } - private def resolveAvroSchemaNullability(schema: Schema) = { - AvroConversionUtils.resolveAvroTypeNullability(schema) match { - case (nullable, _) => nullable + // TODO extract to HoodieAvroSchemaUtils + abstract class AvroProjection extends (GenericRecord => GenericRecord) + + class SafeAvroProjection(sourceSchema: Schema, + projectedSchema: Schema, + reusableRecordBuilder: GenericRecordBuilder = null) extends AvroProjection { + + private val ordinals: List[Int] = collectFieldOrdinals(projectedSchema, sourceSchema) + private val recordBuilder: GenericRecordBuilder = + if (reusableRecordBuilder != null) { + reusableRecordBuilder + } else { + new GenericRecordBuilder(projectedSchema) + } + + override def apply(record: GenericRecord): GenericRecord = { + val fields = projectedSchema.getFields.asScala + checkState(fields.length == ordinals.length) + fields.zip(ordinals).foreach { + case (field, pos) => recordBuilder.set(field, record.get(pos)) + } + recordBuilder.build() + } + } + + object SafeAvroProjection { + def create(sourceSchema: Schema, projectedSchema: Schema, reusableRecordBuilder: GenericRecordBuilder = null): SafeAvroProjection = + new SafeAvroProjection( + sourceSchema = sourceSchema, + projectedSchema = projectedSchema, + reusableRecordBuilder = reusableRecordBuilder) + + /** + * Maps [[projected]] [[Schema]] onto [[source]] one, collecting corresponding field ordinals w/in it, which + * will be subsequently used by either [[projectRowUnsafe]] or [[projectAvroUnsafe()]] method + * + * @param projected target projected schema (which is a proper subset of [[source]] [[Schema]]) + * @param source source schema of the record being projected + * @return list of ordinals of corresponding fields of [[projected]] schema w/in [[source]] one + */ + private def collectFieldOrdinals(projected: Schema, source: Schema): List[Int] = { + projected.getFields.asScala.map(f => source.getField(f.name()).pos()).toList } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 6fa130ac8caf8..0fc6ef2f83aec 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -17,7 +17,6 @@ package org.apache.hudi -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{GlobPattern, Path} import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.common.model.{FileSlice, HoodieRecord} @@ -27,7 +26,9 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, getWritePartitionPaths, listAffectedFilesForCommits} +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType @@ -58,32 +59,15 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, } protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], - partitionSchema: StructType, - dataSchema: HoodieTableSchema, + tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, requestedColumns: Array[String], - filters: Array[Filter]): HoodieMergeOnReadRDD = { - val fullSchemaParquetReader = createBaseFileReader( - spark = sqlContext.sparkSession, - partitionSchema = partitionSchema, - dataSchema = dataSchema, - requiredSchema = dataSchema, - // This file-reader is used to read base file records, subsequently merging them with the records - // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding - // applying any user-defined filtering _before_ we complete combining them w/ delta-log records (to make sure that - // we combine them correctly) - // - // The only filtering applicable here is the filtering to make sure we're only fetching records that - // fall into incremental span of the timeline being queried - filters = incrementalSpanRecordFilters, - options = optParams, - // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it - // to configure Parquet reader appropriately - hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt) - ) - - val (requiredSchemaBaseFileReaderMerging, requiredSchemaBaseFileReaderNoMerging) = - createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, requiredSchema, requestedColumns, filters ++ incrementalSpanRecordFilters) + filters: Array[Filter]): RDD[InternalRow] = { + // The only required filters are ones that make sure we're only fetching records that + // fall into incremental span of the timeline being queried + val requiredFilters = incrementalSpanRecordFilters + val optionalFilters = filters + val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) val hoodieTableState = getTableState // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately @@ -91,12 +75,8 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, new HoodieMergeOnReadRDD( sqlContext.sparkContext, config = jobConf, - fileReaders = HoodieMergeOnReadBaseFileReaders( - fullSchemaFileReader = fullSchemaParquetReader, - requiredSchemaFileReaderForMerging = requiredSchemaBaseFileReaderMerging, - requiredSchemaFileReaderForNoMerging = requiredSchemaBaseFileReaderNoMerging - ), - dataSchema = dataSchema, + fileReaders = readers, + tableSchema = tableSchema, requiredSchema = requiredSchema, tableState = hoodieTableState, mergeType = mergeType, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index c6d4eafafc91d..1cb35aa61dc4d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression @@ -80,44 +81,113 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, } protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], - partitionSchema: StructType, - dataSchema: HoodieTableSchema, + tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, requestedColumns: Array[String], - filters: Array[Filter]): HoodieMergeOnReadRDD = { - val fullSchemaBaseFileReader = createBaseFileReader( + filters: Array[Filter]): RDD[InternalRow] = { + val requiredFilters = Seq.empty + val optionalFilters = filters + val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) + + val tableState = getTableState + new HoodieMergeOnReadRDD( + sqlContext.sparkContext, + config = jobConf, + fileReaders = readers, + tableSchema = tableSchema, + requiredSchema = requiredSchema, + tableState = tableState, + mergeType = mergeType, + fileSplits = fileSplits) + } + + protected def createBaseFileReaders(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + requestedColumns: Array[String], + requiredFilters: Seq[Filter], + optionalFilters: Seq[Filter] = Seq.empty): HoodieMergeOnReadBaseFileReaders = { + val (partitionSchema, dataSchema, requiredDataSchema) = + tryPrunePartitionColumns(tableSchema, requiredSchema) + + val fullSchemaReader = createBaseFileReader( spark = sqlContext.sparkSession, partitionSchema = partitionSchema, dataSchema = dataSchema, - requiredSchema = dataSchema, + requiredDataSchema = dataSchema, // This file-reader is used to read base file records, subsequently merging them with the records // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that - // we combine them correctly) - filters = Seq.empty, + // we combine them correctly); + // As such only required filters could be pushed-down to such reader + filters = requiredFilters, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it // to configure Parquet reader appropriately hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt) ) - val (requiredSchemaBaseFileReaderMerging, requiredSchemaBaseFileReaderNoMerging) = - createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, requiredSchema, requestedColumns, filters) - - val tableState = getTableState - new HoodieMergeOnReadRDD( - sqlContext.sparkContext, - config = jobConf, - fileReaders = HoodieMergeOnReadBaseFileReaders( - fullSchemaFileReader = fullSchemaBaseFileReader, - requiredSchemaFileReaderForMerging = requiredSchemaBaseFileReaderMerging, - requiredSchemaFileReaderForNoMerging = requiredSchemaBaseFileReaderNoMerging - ), + val requiredSchemaReader = createBaseFileReader( + spark = sqlContext.sparkSession, + partitionSchema = partitionSchema, dataSchema = dataSchema, - requiredSchema = requiredSchema, - tableState = tableState, - mergeType = mergeType, - fileSplits = fileSplits) + requiredDataSchema = requiredDataSchema, + // This file-reader is used to read base file records, subsequently merging them with the records + // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding + // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that + // we combine them correctly); + // As such only required filters could be pushed-down to such reader + filters = requiredFilters, + options = optParams, + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema) + ) + + // Check whether fields required for merging were also requested to be fetched + // by the query: + // - In case they were, there's no optimization we could apply here (we will have + // to fetch such fields) + // - In case they were not, we will provide 2 separate file-readers + // a) One which would be applied to file-groups w/ delta-logs (merging) + // b) One which would be applied to file-groups w/ no delta-logs or + // in case query-mode is skipping merging + val mandatoryColumns = mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName) + if (mandatoryColumns.forall(requestedColumns.contains)) { + HoodieMergeOnReadBaseFileReaders( + fullSchemaReader = fullSchemaReader, + requiredSchemaReader = requiredSchemaReader, + requiredSchemaReaderSkipMerging = requiredSchemaReader + ) + } else { + val prunedRequiredSchema = { + val superfluousColumnNames = mandatoryColumns.filterNot(requestedColumns.contains) + val prunedStructSchema = + StructType(requiredDataSchema.structTypeSchema.fields + .filterNot(f => superfluousColumnNames.contains(f.name))) + + HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema).toString) + } + + val requiredSchemaReaderSkipMerging = createBaseFileReader( + spark = sqlContext.sparkSession, + partitionSchema = partitionSchema, + dataSchema = dataSchema, + requiredDataSchema = prunedRequiredSchema, + // This file-reader is only used in cases when no merging is performed, therefore it's safe to push + // down these filters to the base file readers + filters = requiredFilters ++ optionalFilters, + options = optParams, + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema) + ) + + HoodieMergeOnReadBaseFileReaders( + fullSchemaReader = fullSchemaReader, + requiredSchemaReader = requiredSchemaReader, + requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging + ) + } } protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { @@ -156,7 +226,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, spark = sqlContext.sparkSession, partitionSchema = partitionSchema, dataSchema = dataSchema, - requiredSchema = requiredDataSchema, + requiredDataSchema = requiredDataSchema, filters = filters, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it @@ -189,7 +259,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, spark = sqlContext.sparkSession, partitionSchema = partitionSchema, dataSchema = dataSchema, - requiredSchema = prunedRequiredSchema, + requiredDataSchema = prunedRequiredSchema, filters = filters, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala index b5d19bd37d682..6a04ec57e1127 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala @@ -384,7 +384,7 @@ private object ColumnStatsExpressionUtils { * Returns only [[AttributeReference]] contained as a sub-expression */ object AllowedTransformationExpression extends SparkAdapterSupport { - val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils() + val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils def unapply(expr: Expression): Option[AttributeReference] = { // First step, we check that expression diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala index f2ae31a0f7286..289f800b27f28 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala @@ -41,7 +41,7 @@ class RunClusteringProcedure extends BaseProcedure with Logging with SparkAdapterSupport { - private val exprUtils = sparkAdapter.getCatalystExpressionUtils() + private val exprUtils = sparkAdapter.getCatalystExpressionUtils /** * OPTIMIZE table_name|table_path [WHERE predicate] diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala index bd5aa01216fcb..9300b94bb9cc0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala @@ -57,7 +57,7 @@ case class IndexRow(fileName: String, class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSupport { - val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils() + val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils var spark: SparkSession = _ diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 57c826af92ca6..9aa3c509c3dab 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.hudi -import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE} +import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieDuplicateKeyException import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.spark.sql.SaveMode -import org.apache.spark.sql.internal.SQLConf import java.io.File diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index fdc085780047c..30af252d2dc16 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -40,7 +40,7 @@ import scala.collection.mutable.ArrayBuffer */ class Spark2Adapter extends SparkAdapter { - override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark2CatalystExpressionUtils + override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark2CatalystExpressionUtils override def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark2CatalystPlanUtils diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala index 8093d7069220e..028bb5788cc29 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala @@ -19,20 +19,20 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark31CatalystExpressionUtils, HoodieSpark31CatalystPlanUtils} import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31HoodieParquetFileFormat} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark31CatalystExpressionUtils, HoodieSpark31CatalystPlanUtils} /** * Implementation of [[SparkAdapter]] for Spark 3.1.x */ class Spark3_1Adapter extends BaseSpark3Adapter { - def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark31CatalystPlanUtils + override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark31CatalystExpressionUtils - override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark31CatalystExpressionUtils + override def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark31CatalystPlanUtils override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer = new HoodieSpark3_1AvroSerializer(rootCatalystType, rootAvroType, nullable) diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala index ceb66b7437ed2..fe25ee7fdc6b8 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala @@ -30,15 +30,15 @@ import org.apache.spark.sql._ */ class Spark3_2Adapter extends BaseSpark3Adapter { - def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark32CatalystPlanUtils - override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer = new HoodieSpark3_2AvroSerializer(rootCatalystType, rootAvroType, nullable) override def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer = new HoodieSpark3_2AvroDeserializer(rootAvroType, rootCatalystType) - override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark32CatalystExpressionUtils + override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark32CatalystExpressionUtils + + override def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark32CatalystPlanUtils override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = { Some(