diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 69005cd75332c..0844c7361bfc2 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -206,17 +206,6 @@ object AvroConversionUtils { SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] } - def buildAvroRecordBySchema(record: IndexedRecord, - requiredSchema: Schema, - requiredPos: Seq[Int], - recordBuilder: GenericRecordBuilder): GenericRecord = { - val requiredFields = requiredSchema.getFields.asScala - assert(requiredFields.length == requiredPos.length) - val positionIterator = requiredPos.iterator - requiredFields.foreach(f => recordBuilder.set(f, record.get(positionIterator.next()))) - recordBuilder.build() - } - def getAvroRecordNameAndNamespace(tableName: String): (String, String) = { val name = HoodieAvroUtils.sanitizeName(tableName) (s"${name}_record", s"hoodie.${name}") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index da4e8d30e206f..dfdb90801970c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.HoodieBaseRelation.createBaseFileReader import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SQLContext} +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources._ @@ -52,6 +52,9 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, override type FileSplit = HoodieBaseFileSplit + override lazy val mandatoryColumns: Seq[String] = + Seq(recordKeyField) + protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit], partitionSchema: StructType, tableSchema: HoodieTableSchema, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 98fc218879734..ac2937e57ea16 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hbase.io.hfile.CacheConfig import org.apache.hadoop.mapred.JobConf -import org.apache.hudi.HoodieBaseRelation.{getPartitionPath, isMetadataTable} +import org.apache.hudi.HoodieBaseRelation.getPartitionPath import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.common.config.SerializableConfiguration import org.apache.hudi.common.fs.FSUtils @@ -32,8 +32,9 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.io.storage.HoodieHFileReader -import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata} +import org.apache.hudi.metadata.HoodieTableMetadata import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -53,8 +54,12 @@ trait HoodieFileSplit {} case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String) -case class HoodieTableState(recordKeyField: String, - preCombineFieldOpt: Option[String]) +case class HoodieTableState(tablePath: String, + latestCommitTimestamp: String, + recordKeyField: String, + preCombineFieldOpt: Option[String], + usesVirtualKeys: Boolean, + recordPayloadClassName: String) /** * Hoodie BaseRelation which extends [[PrunedFilteredScan]]. @@ -78,13 +83,30 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected lazy val basePath: String = metaClient.getBasePath - // If meta fields are enabled, always prefer key from the meta field as opposed to user-specified one - // NOTE: This is historical behavior which is preserved as is + // NOTE: Record key-field is assumed singular here due to the either of + // - In case Hudi's meta fields are enabled: record key will be pre-materialized (stored) as part + // of the record's payload (as part of the Hudi's metadata) + // - In case Hudi's meta fields are disabled (virtual keys): in that case record has to bear _single field_ + // identified as its (unique) primary key w/in its payload (this is a limitation of [[SimpleKeyGenerator]], + // which is the only [[KeyGenerator]] permitted for virtual-keys payloads) protected lazy val recordKeyField: String = - if (tableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD - else tableConfig.getRecordKeyFieldProp + if (tableConfig.populateMetaFields()) { + HoodieRecord.RECORD_KEY_METADATA_FIELD + } else { + val keyFields = tableConfig.getRecordKeyFields.get() + checkState(keyFields.length == 1) + keyFields.head + } - protected lazy val preCombineFieldOpt: Option[String] = getPrecombineFieldProperty + protected lazy val preCombineFieldOpt: Option[String] = + Option(tableConfig.getPreCombineField) + .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match { + // NOTE: This is required to compensate for cases when empty string is used to stub + // property value to avoid it being set with the default value + // TODO(HUDI-3456) cleanup + case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f) + case _ => None + } protected lazy val specifiedQueryTimestamp: Option[String] = optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) @@ -118,16 +140,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, FileStatusCache.getOrCreate(sparkSession)) /** + * Columns that relation has to read from the storage to properly execute on its semantic: for ex, + * for Merge-on-Read tables key fields as well and pre-combine field comprise mandatory set of columns, + * meaning that regardless of whether this columns are being requested by the query they will be fetched + * regardless so that relation is able to combine records properly (if necessary) + * * @VisibleInTests */ - lazy val mandatoryColumns: Seq[String] = { - if (isMetadataTable(metaClient)) { - Seq(HoodieMetadataPayload.KEY_FIELD_NAME, HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE) - } else { - // TODO this is MOR table requirement, not necessary for COW - Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) - } - } + val mandatoryColumns: Seq[String] protected def timeline: HoodieTimeline = // NOTE: We're including compaction here since it's not considering a "commit" operation @@ -136,9 +156,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def latestInstant: Option[HoodieInstant] = toScalaOption(timeline.lastInstant()) - protected def queryTimestamp: Option[String] = { - specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(i => i.getTimestamp)) - } + protected def queryTimestamp: Option[String] = + specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp)) override def schema: StructType = tableStructSchema @@ -257,15 +276,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, requestedColumns ++ missing } - private def getPrecombineFieldProperty: Option[String] = - Option(tableConfig.getPreCombineField) - .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match { - // NOTE: This is required to compensate for cases when empty string is used to stub - // property value to avoid it being set with the default value - // TODO(HUDI-3456) cleanup - case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f) - case _ => None - } + protected def getTableState: HoodieTableState = { + // Subset of the state of table's configuration as of at the time of the query + HoodieTableState( + tablePath = basePath, + latestCommitTimestamp = queryTimestamp.get, + recordKeyField = recordKeyField, + preCombineFieldOpt = preCombineFieldOpt, + usesVirtualKeys = !tableConfig.populateMetaFields(), + recordPayloadClassName = tableConfig.getPayloadClass + ) + } private def imbueConfigs(sqlContext: SQLContext): Unit = { sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") @@ -280,9 +301,6 @@ object HoodieBaseRelation { def getPartitionPath(fileStatus: FileStatus): Path = fileStatus.getPath.getParent - def isMetadataTable(metaClient: HoodieTableMetaClient): Boolean = - HoodieTableMetadata.isMetadataTable(metaClient.getBasePath) - /** * Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]] * over [[InternalRow]] diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala index 0871487b5e8c6..487f6a6788431 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -65,28 +65,6 @@ object HoodieDataSourceHelper extends PredicateHelper { } } - /** - * Convert [[InternalRow]] to [[SpecificInternalRow]]. - */ - def createInternalRowWithSchema( - row: InternalRow, - schema: StructType, - positions: Seq[Int]): InternalRow = { - val rowToReturn = new SpecificInternalRow(schema) - var curIndex = 0 - schema.zip(positions).foreach { case (field, pos) => - val curField = if (row.isNullAt(pos)) { - null - } else { - row.get(pos, field.dataType) - } - rowToReturn.update(curIndex, curField) - curIndex += 1 - } - rowToReturn - } - - def splitFiles( sparkSession: SparkSession, file: FileStatus, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala index cf68981d8318f..a176626f76421 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala @@ -20,64 +20,15 @@ package org.apache.hudi import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException} -import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit -/** - * TODO eval if we actually need it - */ class HoodieFileScanRDD(@transient private val sparkSession: SparkSession, readFunction: PartitionedFile => Iterator[InternalRow], @transient fileSplits: Seq[HoodieBaseFileSplit]) - extends HoodieUnsafeRDD(sparkSession.sparkContext) { - - override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - val iterator = new Iterator[InternalRow] with AutoCloseable { - private[this] val files = split.asInstanceOf[FilePartition].files.toIterator - private[this] var currentFile: PartitionedFile = _ - private[this] var currentIterator: Iterator[InternalRow] = _ - - override def hasNext: Boolean = { - (currentIterator != null && currentIterator.hasNext) || nextIterator() - } - - def next(): InternalRow = currentIterator.next() - - /** Advances to the next file. Returns true if a new non-empty iterator is available. */ - private def nextIterator(): Boolean = { - if (files.hasNext) { - currentFile = files.next() - logInfo(s"Reading File $currentFile") - currentIterator = readFunction(currentFile) - - try { - hasNext - } catch { - case e: SchemaColumnConvertNotSupportedException => - val message = "Parquet column cannot be converted in " + - s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + - s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" - throw new QueryExecutionException(message, e) - - case e => throw e - } - } else { - currentFile = null - false - } - } - - override def close(): Unit = {} - } - - // Register an on-task-completion callback to close the input stream. - context.addTaskCompletionListener[Unit](_ => iterator.close()) - - iterator.asInstanceOf[Iterator[InternalRow]] - } + extends FileScanRDD(sparkSession, readFunction, fileSplits.map(_.filePartition)) + with HoodieUnsafeRDD { - override protected def getPartitions: Array[Partition] = fileSplits.map(_.filePartition).toArray + override final def collect(): Array[InternalRow] = super[HoodieUnsafeRDD].collect() } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index cc2915d605ff7..ae4b7ebf01463 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -22,29 +22,34 @@ import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hudi.HoodieDataSourceHelper._ -import org.apache.hudi.HoodieMergeOnReadRDD.resolveAvroSchemaNullability +import org.apache.hadoop.mapred.JobConf +import org.apache.hudi.HoodieConversionUtils.toScalaOption +import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability} import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath -import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath +import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner +import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.HoodiePayloadConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.config.HoodieRealtimeConfig +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.avro.HoodieAvroDeserializer import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} import java.io.Closeable import java.util.Properties import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.util.Try case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition @@ -53,14 +58,16 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, @transient config: Configuration, fullSchemaFileReader: PartitionedFile => Iterator[InternalRow], requiredSchemaFileReader: PartitionedFile => Iterator[InternalRow], - tableState: HoodieTableState, tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, + tableState: HoodieTableState, + mergeType: String, @transient fileSplits: Seq[HoodieMergeOnReadFileSplit]) - extends HoodieUnsafeRDD(sc) { + extends RDD[InternalRow](sc, Nil) with HoodieUnsafeRDD { + + protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) private val confBroadcast = sc.broadcast(new SerializableWritable(config)) - private val recordKeyField = tableState.recordKeyField private val payloadProps = tableState.preCombineFieldOpt .map(preCombineField => HoodiePayloadConfig.newBuilder @@ -70,34 +77,59 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, ) .getOrElse(new Properties()) + private val whitelistedPayloadClasses: Set[String] = Seq( + classOf[OverwriteWithLatestAvroPayload] + ).map(_.getName).toSet + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition] val iter = mergeOnReadPartition.split match { case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => - requiredSchemaFileReader(dataFileOnlySplit.dataFile.get) + requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get) + case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => - logFileIterator(logFileOnlySplit, getConfig) - case skipMergeSplit if skipMergeSplit.mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => - skipMergeFileIterator(skipMergeSplit, requiredSchemaFileReader(skipMergeSplit.dataFile.get), getConfig) - case payloadCombineSplit - if payloadCombineSplit.mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => - payloadCombineFileIterator(payloadCombineSplit, fullSchemaFileReader(payloadCombineSplit.dataFile.get), - getConfig) + new LogFileIterator(logFileOnlySplit, getConfig) + + case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => + val baseFileIterator = requiredSchemaFileReader.apply(split.dataFile.get) + new SkipMergeIterator(split, baseFileIterator, getConfig) + + case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => + val (baseFileIterator, schema) = readBaseFile(split) + new RecordMergingFileIterator(split, baseFileIterator, schema, getConfig) + case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " + s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" + s"log paths: ${mergeOnReadPartition.split.logFiles.toString}" + - s"hoodie table path: ${mergeOnReadPartition.split.tablePath}" + + s"hoodie table path: ${tableState.tablePath}" + s"spark partition Index: ${mergeOnReadPartition.index}" + - s"merge type: ${mergeOnReadPartition.split.mergeType}") + s"merge type: ${mergeType}") } + if (iter.isInstanceOf[Closeable]) { // register a callback to close logScanner which will be executed on task completion. // when tasks finished, this method will be called, and release resources. Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.asInstanceOf[Closeable].close())) } + iter } + private def readBaseFile(split: HoodieMergeOnReadFileSplit): (Iterator[InternalRow], HoodieTableSchema) = { + // NOTE: This is an optimization making sure that even for MOR tables we fetch absolute minimum + // of the stored data possible, while still properly executing corresponding relation's semantic + // and meet the query's requirements. + // + // Here we assume that iff queried table + // a) It does use one of the standard (and whitelisted) Record Payload classes + // then we can avoid reading and parsing the records w/ _full_ schema, and instead only + // rely on projected one, nevertheless being able to perform merging correctly + if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) + (fullSchemaFileReader(split.dataFile.get), tableSchema) + else + (requiredSchemaFileReader(split.dataFile.get), requiredSchema) + } + override protected def getPartitions: Array[Partition] = fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray @@ -108,270 +140,284 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } } - private def logFileIterator(split: HoodieMergeOnReadFileSplit, - config: Configuration): Iterator[InternalRow] = - new Iterator[InternalRow] with Closeable with SparkAdapterSupport { - private val tableAvroSchema = new Schema.Parser().parse(tableSchema.avroSchemaStr) - private val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) - private val requiredFieldPosition = - requiredSchema.structTypeSchema - .map(f => tableAvroSchema.getField(f.name).pos()).toList - private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) - private val deserializer = sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredSchema.structTypeSchema) - private val unsafeProjection = UnsafeProjection.create(requiredSchema.structTypeSchema) - private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config) - private val logRecords = logScanner.getRecords - private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala - - private var recordToLoad: InternalRow = _ - - override def hasNext: Boolean = { - if (logRecordsKeyIterator.hasNext) { - val curAvrokey = logRecordsKeyIterator.next() - val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps) - if (!curAvroRecord.isPresent) { - // delete record found, skipping - this.hasNext - } else { - val requiredAvroRecord = AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, - requiredFieldPosition, recordBuilder) - val rowOpt = deserializer.deserialize(requiredAvroRecord) - recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow]) - true + /** + * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all of the records stored in + * Delta Log files (represented as [[InternalRow]]s) + */ + private class LogFileIterator(split: HoodieMergeOnReadFileSplit, + config: Configuration) + extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport { + + protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) + protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema + + protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr) + + protected val recordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema) + protected var recordToLoad: InternalRow = _ + + // TODO validate whether we need to do UnsafeProjection + protected val unsafeProjection: UnsafeProjection = UnsafeProjection.create(requiredStructTypeSchema) + + // NOTE: This maps _required_ schema fields onto the _full_ table schema, collecting their "ordinals" + // w/in the record payload. This is required, to project records read from the Delta Log file + // which always reads records in full schema (never projected, due to the fact that DL file might + // be stored in non-columnar formats like Avro, HFile, etc) + private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema) + + private var logScanner = + HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState, + maxCompactionMemoryInBytes, config) + + private val logRecords = logScanner.getRecords.asScala + + // NOTE: This iterator iterates over already projected (in required schema) records + // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's + // going to be used, since we modify `logRecords` before that and therefore can't do it any earlier + protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] = + logRecords.iterator.map { + case (_, record) => + val avroRecordOpt = toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps)) + avroRecordOpt.map { + avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, requiredSchemaFieldOrdinals, recordBuilder) } - } else { - false - } } - override def next(): InternalRow = { - recordToLoad - } - - override def close(): Unit = { - if (logScanner != null) { - try { - logScanner.close() - } finally { - logScanner = null - } - } - } - } + protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] = + logRecords.remove(key) - private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit, - baseFileIterator: Iterator[InternalRow], - config: Configuration): Iterator[InternalRow] = - new Iterator[InternalRow] with Closeable with SparkAdapterSupport { - private val tableAvroSchema = new Schema.Parser().parse(tableSchema.avroSchemaStr) - private val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) - private val requiredFieldPosition = - requiredSchema.structTypeSchema - .map(f => tableAvroSchema.getField(f.name).pos()).toList - private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) - private val deserializer = sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredSchema.structTypeSchema) - private val unsafeProjection = UnsafeProjection.create(requiredSchema.structTypeSchema) - private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config) - private val logRecords = logScanner.getRecords - private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala - - private var recordToLoad: InternalRow = _ - - @scala.annotation.tailrec - override def hasNext: Boolean = { - if (baseFileIterator.hasNext) { - val curRow = baseFileIterator.next() - recordToLoad = unsafeProjection(curRow) - true + override def hasNext: Boolean = + logRecordsIterator.hasNext && { + val avroRecordOpt = logRecordsIterator.next() + if (avroRecordOpt.isEmpty) { + // Record has been deleted, skipping + this.hasNext } else { - if (logRecordsKeyIterator.hasNext) { - val curAvrokey = logRecordsKeyIterator.next() - val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps) - if (!curAvroRecord.isPresent) { - // delete record found, skipping - this.hasNext - } else { - val requiredAvroRecord = AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, - requiredFieldPosition, recordBuilder) - val rowOpt = deserializer.deserialize(requiredAvroRecord) - recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow]) - true - } - } else { - false - } + recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get)) + true } } - override def next(): InternalRow = { - recordToLoad - } + override final def next(): InternalRow = recordToLoad - override def close(): Unit = { - if (logScanner != null) { - try { - logScanner.close() - } finally { - logScanner = null - } + override def close(): Unit = + if (logScanner != null) { + try { + logScanner.close() + } finally { + logScanner = null } } + } + + /** + * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in + * Base file as well as all of the Delta Log files simply returning concatenation of these streams, while not + * performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially) + */ + private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit, + baseFileIterator: Iterator[InternalRow], + config: Configuration) + extends LogFileIterator(split, config) { + + override def hasNext: Boolean = { + if (baseFileIterator.hasNext) { + val curRow = baseFileIterator.next() + recordToLoad = unsafeProjection(curRow) + true + } else { + super[LogFileIterator].hasNext + } } + } - private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit, - baseFileIterator: Iterator[InternalRow], - config: Configuration): Iterator[InternalRow] = - new Iterator[InternalRow] with Closeable with SparkAdapterSupport { - private val tableAvroSchema = new Schema.Parser().parse(tableSchema.avroSchemaStr) - private val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) - private val requiredFieldPosition = - requiredSchema.structTypeSchema - .map(f => tableAvroSchema.getField(f.name).pos()).toList - private val serializer = sparkAdapter.createAvroSerializer(tableSchema.structTypeSchema, tableAvroSchema, - resolveAvroSchemaNullability(tableAvroSchema)) - private val requiredDeserializer = sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredSchema.structTypeSchema) - private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) - private val unsafeProjection = UnsafeProjection.create(requiredSchema.structTypeSchema) - private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config) - private val logRecords = logScanner.getRecords - private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala - private val keyToSkip = mutable.Set.empty[String] - private val recordKeyPosition = tableSchema.structTypeSchema.fieldIndex(recordKeyField) - - private var recordToLoad: InternalRow = _ - - @scala.annotation.tailrec - override def hasNext: Boolean = { - if (baseFileIterator.hasNext) { - val curRow = baseFileIterator.next() - val curKey = curRow.getString(recordKeyPosition) - if (logRecords.containsKey(curKey)) { - // duplicate key found, merging - keyToSkip.add(curKey) - val mergedAvroRecord = mergeRowWithLog(curRow, curKey) - if (!mergedAvroRecord.isPresent) { - // deleted - this.hasNext - } else { - // load merged record as InternalRow with required schema - val requiredAvroRecord = AvroConversionUtils.buildAvroRecordBySchema(mergedAvroRecord.get(), requiredAvroSchema, - requiredFieldPosition, recordBuilder) - val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord) - recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow]) - true - } - } else { - // No merge needed, load current row with required schema - recordToLoad = unsafeProjection(createInternalRowWithSchema(curRow, requiredSchema.structTypeSchema, requiredFieldPosition)) - true - } + /** + * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in + * a) Base file and all of the b) Delta Log files combining records with the same primary key from both of these + * streams + */ + private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, + baseFileIterator: Iterator[InternalRow], + baseFileReaderSchema: HoodieTableSchema, + config: Configuration) + extends LogFileIterator(split, config) { + + // NOTE: Record-merging iterator supports 2 modes of operation merging records bearing either + // - Full table's schema + // - Projected schema + // As such, no particular schema could be assumed, and therefore we rely on the caller + // to correspondingly set the scheme of the expected output of base-file reader + private val baseFileReaderAvroSchema = new Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr) + private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema) + + private val serializer = sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema, + baseFileReaderAvroSchema, resolveAvroSchemaNullability(baseFileReaderAvroSchema)) + + private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField) + + override def hasNext: Boolean = { + if (baseFileIterator.hasNext) { + val curRowRecord = baseFileIterator.next() + val curKey = curRowRecord.getString(recordKeyOrdinal) + val updatedRecordOpt = removeLogRecord(curKey) + if (updatedRecordOpt.isEmpty) { + // No merge needed, load current row with required projected schema + recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals)) + true } else { - if (logRecordsKeyIterator.hasNext) { - val curKey = logRecordsKeyIterator.next() - if (keyToSkip.contains(curKey)) { - this.hasNext - } else { - val insertAvroRecord = logRecords.get(curKey).getData.getInsertValue(tableAvroSchema, payloadProps) - if (!insertAvroRecord.isPresent) { - // stand alone delete record, skipping - this.hasNext - } else { - val requiredAvroRecord = AvroConversionUtils - .buildAvroRecordBySchema( - insertAvroRecord.get(), - requiredAvroSchema, - requiredFieldPosition, - recordBuilder - ) - val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord) - recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow]) - true - } - } + val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get) + if (mergedAvroRecordOpt.isEmpty) { + // Record has been deleted, skipping + this.hasNext } else { - false + // NOTE: In occurrence of a merge we can't know the schema of the record being returned, b/c + // record from the Delta Log will bear (full) Table schema, while record from the Base file + // might already be read in projected one (as an optimization). + // As such we can't use more performant [[projectAvroUnsafe]], and instead have to fallback + // to [[projectAvro]] + val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, requiredAvroSchema, recordBuilder) + recordToLoad = unsafeProjection(deserialize(projectedAvroRecord)) + true } } + } else { + super[LogFileIterator].hasNext } + } - override def next(): InternalRow = recordToLoad - - override def close(): Unit = { - if (logScanner != null) { - try { - logScanner.close() - } finally { - logScanner = null - } - } - } + private def serialize(curRowRecord: InternalRow): GenericRecord = + serializer.serialize(curRowRecord).asInstanceOf[GenericRecord] - private def mergeRowWithLog(curRow: InternalRow, curKey: String) : org.apache.hudi.common.util.Option[IndexedRecord] = { - val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord] - val mergedRec = logRecords.get(curKey).getData - .combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema, payloadProps) - if (mergedRec.isPresent && mergedRec.get().getSchema != tableAvroSchema) { - org.apache.hudi.common.util.Option.of(HoodieAvroUtils.rewriteRecord(mergedRec.get().asInstanceOf[GenericRecord], tableAvroSchema).asInstanceOf[IndexedRecord]) - } else { - mergedRec - } - } + private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): Option[IndexedRecord] = { + // NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API + // on the record from the Delta Log + toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, logFileReaderAvroSchema, payloadProps)) } + } } private object HoodieMergeOnReadRDD { + val CONFIG_INSTANTIATION_LOCK = new Object() - def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = { - val fs = FSUtils.getFs(split.tablePath, config) - val logFiles = split.logFiles.get + def scanLog(logFiles: List[HoodieLogFile], + partitionPath: Path, + logSchema: Schema, + tableState: HoodieTableState, + maxCompactionMemoryInBytes: Long, + hadoopConf: Configuration): HoodieMergedLogRecordScanner = { + val tablePath = tableState.tablePath + val fs = FSUtils.getFs(tablePath, hadoopConf) - if (HoodieTableMetadata.isMetadataTable(split.tablePath)) { + if (HoodieTableMetadata.isMetadataTable(tablePath)) { val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build() - val dataTableBasePath = getDataTableBasePathFromMetadataTable(split.tablePath) + val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath) val metadataTable = new HoodieBackedTableMetadata( - new HoodieLocalEngineContext(config), metadataConfig, + new HoodieLocalEngineContext(hadoopConf), metadataConfig, dataTableBasePath, - config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) + hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) // NOTE: In case of Metadata Table partition path equates to partition name (since there's just one level // of indirection among MT partitions) - val relativePartitionPath = getRelativePartitionPath(new Path(split.tablePath), getPartitionPath(split)) + val relativePartitionPath = getRelativePartitionPath(new Path(tablePath), partitionPath) metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath).getLeft } else { val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) - .withBasePath(split.tablePath) - .withLogFilePaths(split.logFiles.get.map(logFile => getFilePath(logFile.getPath)).asJava) + .withBasePath(tablePath) + .withLogFilePaths(logFiles.map(logFile => getFilePath(logFile.getPath)).asJava) .withReaderSchema(logSchema) - .withLatestInstantTime(split.latestCommit) + .withLatestInstantTime(tableState.latestCommitTimestamp) .withReadBlocksLazily( - Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, + Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean) .getOrElse(false)) .withReverseReader(false) .withBufferSize( - config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, + hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) - .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes) + .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes) .withSpillableMapBasePath( - config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, + hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) if (logFiles.nonEmpty) { - logRecordScannerBuilder.withPartition(getRelativePartitionPath(new Path(split.tablePath), logFiles.head.getPath.getParent)) + logRecordScannerBuilder.withPartition( + getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent)) } logRecordScannerBuilder.build() } } + /** + * Projects provided instance of [[InternalRow]] into provided schema, assuming that the + * the schema of the original row is strictly a superset of the given one + */ + private def projectRowUnsafe(row: InternalRow, + projectedSchema: StructType, + ordinals: Seq[Int]): InternalRow = { + val projectedRow = new SpecificInternalRow(projectedSchema) + var curIndex = 0 + projectedSchema.zip(ordinals).foreach { case (field, pos) => + val curField = if (row.isNullAt(pos)) { + null + } else { + row.get(pos, field.dataType) + } + projectedRow.update(curIndex, curField) + curIndex += 1 + } + projectedRow + } + + /** + * Projects provided instance of [[IndexedRecord]] into provided schema, assuming that the + * the schema of the original row is strictly a superset of the given one + */ + def projectAvroUnsafe(record: IndexedRecord, + projectedSchema: Schema, + ordinals: List[Int], + recordBuilder: GenericRecordBuilder): GenericRecord = { + val fields = projectedSchema.getFields.asScala + checkState(fields.length == ordinals.length) + fields.zip(ordinals).foreach { + case (field, pos) => recordBuilder.set(field, record.get(pos)) + } + recordBuilder.build() + } + + /** + * Projects provided instance of [[IndexedRecord]] into provided schema, assuming that the + * the schema of the original row is strictly a superset of the given one + * + * This is a "safe" counterpart of [[projectAvroUnsafe]]: it does build mapping of the record's + * schema into projected one itself (instead of expecting such mapping from the caller) + */ + def projectAvro(record: IndexedRecord, + projectedSchema: Schema, + recordBuilder: GenericRecordBuilder): GenericRecord = { + projectAvroUnsafe(record, projectedSchema, collectFieldOrdinals(projectedSchema, record.getSchema), recordBuilder) + } + + /** + * Maps [[projected]] [[Schema]] onto [[source]] one, collecting corresponding field ordinals w/in it, which + * will be subsequently used by either [[projectRowUnsafe]] or [[projectAvroUnsafe()]] method + * + * @param projected target projected schema (which is a proper subset of [[source]] [[Schema]]) + * @param source source schema of the record being projected + * @return list of ordinals of corresponding fields of [[projected]] schema w/in [[source]] one + */ + private def collectFieldOrdinals(projected: Schema, source: Schema): List[Int] = { + projected.getFields.asScala.map(f => source.getField(f.name()).pos()).toList + } + private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = { // Determine partition path as an immediate parent folder of either // - The base file // - Some log file split.dataFile.map(baseFile => new Path(baseFile.filePath)) - .getOrElse(split.logFiles.get.head.getPath) + .getOrElse(split.logFiles.head.getPath) .getParent } @@ -380,4 +426,17 @@ private object HoodieMergeOnReadRDD { case (nullable, _) => nullable } } + + trait AvroDeserializerSupport extends SparkAdapterSupport { + protected val requiredAvroSchema: Schema + protected val requiredStructTypeSchema: StructType + + private lazy val deserializer: HoodieAvroDeserializer = + sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredStructTypeSchema) + + protected def deserialize(avroRecord: GenericRecord): InternalRow = { + checkState(avroRecord.getSchema.getFields.size() == requiredStructTypeSchema.fields.length) + deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow] + } + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala index 3f95746a54669..51b03a0024efc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala @@ -56,12 +56,8 @@ import org.apache.spark.{Partition, SparkContext, TaskContext} * NOTE: It enforces, for ex, that all of the RDDs implement [[compute]] method returning * [[InternalRow]] to avoid superfluous ser/de */ -abstract class HoodieUnsafeRDD(@transient sc: SparkContext) - extends RDD[InternalRow](sc, Nil) { - - def compute(split: Partition, context: TaskContext): Iterator[InternalRow] - - override final def collect(): Array[InternalRow] = +trait HoodieUnsafeRDD extends RDD[InternalRow] { + override def collect(): Array[InternalRow] = throw new UnsupportedOperationException( "This method will not function correctly, please refer to scala-doc for HoodieUnsafeRDD" ) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 2517252d700fb..c7620eeedb6a3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -90,12 +90,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, hadoopConf = new Configuration(conf) ) - val hoodieTableState = HoodieTableState(HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt) - + val hoodieTableState = getTableState // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately // filtered, since file-reader might not be capable to perform filtering - new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, - requiredSchemaParquetReader, hoodieTableState, tableSchema, requiredSchema, fileSplits) + new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader, + tableSchema, requiredSchema, hoodieTableState, mergeType, fileSplits) } override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index d2515e3297d0f..70fb6430aefc9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -39,11 +39,7 @@ import org.apache.spark.sql.types.StructType import scala.collection.JavaConverters._ case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile], - logFiles: Option[List[HoodieLogFile]], - latestCommit: String, - tablePath: String, - maxCompactionMemoryInBytes: Long, - mergeType: String) extends HoodieFileSplit + logFiles: List[HoodieLogFile]) extends HoodieFileSplit class MergeOnReadSnapshotRelation(sqlContext: SQLContext, optParams: Map[String, String], @@ -54,13 +50,13 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, override type FileSplit = HoodieMergeOnReadFileSplit - private val mergeType = optParams.getOrElse( - DataSourceReadOptions.REALTIME_MERGE.key, - DataSourceReadOptions.REALTIME_MERGE.defaultValue) + override lazy val mandatoryColumns: Seq[String] = + Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) - private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) + protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key, + DataSourceReadOptions.REALTIME_MERGE.defaultValue) - protected override def composeRDD(fileIndex: Seq[HoodieMergeOnReadFileSplit], + protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], partitionSchema: StructType, tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, @@ -93,10 +89,9 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, hadoopConf = new Configuration(conf) ) - val tableState = HoodieTableState(recordKeyField, preCombineFieldOpt) - - new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, - requiredSchemaParquetReader, tableState, tableSchema, requiredSchema, fileIndex) + val tableState = getTableState + new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader, + tableSchema, requiredSchema, tableState, mergeType, fileSplits) } protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { @@ -123,15 +118,14 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, protected def buildSplits(fileSlices: Seq[FileSlice]): List[HoodieMergeOnReadFileSplit] = { fileSlices.map { fileSlice => val baseFile = toScalaOption(fileSlice.getBaseFile) - val logFiles = Option(fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList) + val logFiles = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList val partitionedBaseFile = baseFile.map { file => val filePath = getFilePath(file.getFileStatus.getPath) PartitionedFile(InternalRow.empty, filePath, 0, file.getFileLen) } - HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles, queryTimestamp.get, - metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) + HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles) }.toList } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala index 1ac8fa098119f..a21a29634e980 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark import org.apache.hudi.HoodieUnsafeRDD +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.util.MutablePair diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 96d50f6b57b80..0776a3116af77 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -29,7 +29,7 @@ import org.apache.hudi.exception.{HoodieException, HoodieUpsertException} import org.apache.hudi.keygen._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.testutils.HoodieClientTestBase -import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieMergeOnReadRDD} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, concat, lit, udf} import org.apache.spark.sql.types._ diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala index ca5f79191a729..20b13fcab37d5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala @@ -18,22 +18,22 @@ package org.apache.hudi.functional import org.apache.avro.Schema -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD} import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator} import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig} import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD} import org.apache.parquet.hadoop.util.counters.BenchmarkCounter import org.apache.spark.HoodieUnsafeRDDUtils import org.apache.spark.internal.Logging -import org.apache.spark.sql.{Dataset, Row, SaveMode} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.{Dataset, Row, SaveMode} import org.junit.jupiter.api.Assertions.{assertEquals, fail} import org.junit.jupiter.api.{Tag, Test} -import scala.:+ import scala.collection.JavaConverters._ @Tag("functional") @@ -67,14 +67,14 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with val projectedColumnsReadStats: Array[(String, Long)] = if (HoodieSparkUtils.isSpark3) Array( - ("rider", 2452), - ("rider,driver", 2552), - ("rider,driver,tip_history", 3517)) + ("rider", 2363), + ("rider,driver", 2463), + ("rider,driver,tip_history", 3428)) else if (HoodieSparkUtils.isSpark2) Array( - ("rider", 2595), - ("rider,driver", 2735), - ("rider,driver,tip_history", 3750)) + ("rider", 2474), + ("rider,driver", 2614), + ("rider,driver,tip_history", 3629)) else fail("Only Spark 3 and Spark 2 are currently supported") @@ -107,31 +107,30 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with else fail("Only Spark 3 and Spark 2 are currently supported") - // Stats for the reads fetching _all_ columns (note, how amount of bytes read - // is invariant of the # of columns) - val fullColumnsReadStats: Array[(String, Long)] = + // Test MOR / Snapshot / Skip-merge + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats) + + // Test MOR / Snapshot / Payload-combine + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, projectedColumnsReadStats) + + // Stats for the reads fetching only _projected_ columns (note how amount of bytes read + // increases along w/ the # of columns) in Read Optimized mode (which is essentially equivalent to COW) + val projectedColumnsReadStatsReadOptimized: Array[(String, Long)] = if (HoodieSparkUtils.isSpark3) Array( - ("rider", 14166), - ("rider,driver", 14166), - ("rider,driver,tip_history", 14166)) + ("rider", 2363), + ("rider,driver", 2463), + ("rider,driver,tip_history", 3428)) else if (HoodieSparkUtils.isSpark2) - // TODO re-enable tests (these tests are very unstable currently) Array( - ("rider", -1), - ("rider,driver", -1), - ("rider,driver,tip_history", -1)) + ("rider", 2474), + ("rider,driver", 2614), + ("rider,driver,tip_history", 3629)) else fail("Only Spark 3 and Spark 2 are currently supported") - // Test MOR / Snapshot / Skip-merge - runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats) - - // Test MOR / Snapshot / Payload-combine - runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, fullColumnsReadStats) - // Test MOR / Read Optimized - runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStats) + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized) } @Test @@ -163,17 +162,76 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with else fail("Only Spark 3 and Spark 2 are currently supported") - // Stats for the reads fetching _all_ columns (currently for MOR to be able to merge - // records properly full row has to be fetched; note, how amount of bytes read + // Test MOR / Snapshot / Skip-merge + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats) + + // Test MOR / Snapshot / Payload-combine + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, projectedColumnsReadStats) + + // Stats for the reads fetching only _projected_ columns (note how amount of bytes read + // increases along w/ the # of columns) in Read Optimized mode (which is essentially equivalent to COW) + val projectedColumnsReadStatsReadOptimized: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 2363), + ("rider,driver", 2463), + ("rider,driver,tip_history", 3428)) + else if (HoodieSparkUtils.isSpark2) + Array( + ("rider", 2474), + ("rider,driver", 2614), + ("rider,driver,tip_history", 3629)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + // Test MOR / Read Optimized + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized) + } + + @Test + def testMergeOnReadSnapshotRelationWithDeltaLogsFallback(): Unit = { + val tablePath = s"$basePath/mor-with-logs-fallback" + val targetRecordsCount = 100 + val targetUpdatedRecordsRatio = 0.5 + + // NOTE: This test validates MOR Snapshot Relation falling back to read "whole" row from MOR table (as + // opposed to only required columns) in following cases + // - Non-standard Record Payload is used: such Payload might rely on the fields that are not + // being queried by the Spark, and we currently have no way figuring out what these fields are, therefore + // we fallback to read whole row + val overriddenOpts = defaultWriteOpts ++ Map( + HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName + ) + + val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount, targetUpdatedRecordsRatio, overriddenOpts, populateMetaFields = true) + val tableState = TableState(tablePath, schema, targetRecordsCount, targetUpdatedRecordsRatio) + + // Stats for the reads fetching only _projected_ columns (note how amount of bytes read + // increases along w/ the # of columns) + val projectedColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 2452), + ("rider,driver", 2552), + ("rider,driver,tip_history", 3517)) + else if (HoodieSparkUtils.isSpark2) + Array( + ("rider", 2595), + ("rider,driver", 2735), + ("rider,driver,tip_history", 3750)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + // Stats for the reads fetching _all_ columns (note, how amount of bytes read // is invariant of the # of columns) val fullColumnsReadStats: Array[(String, Long)] = if (HoodieSparkUtils.isSpark3) Array( - ("rider", 14166), - ("rider,driver", 14166), - ("rider,driver,tip_history", 14166)) + ("rider", 14167), + ("rider,driver", 14167), + ("rider,driver,tip_history", 14167)) else if (HoodieSparkUtils.isSpark2) - // TODO re-enable tests (these tests are very unstable currently) + // TODO re-enable tests (these tests are very unstable currently) Array( ("rider", -1), ("rider,driver", -1), @@ -184,11 +242,8 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with // Test MOR / Snapshot / Skip-merge runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats) - // Test MOR / Snapshot / Payload-combine + // Test MOR / Snapshot / Payload-combine (using non-standard Record Payload) runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, fullColumnsReadStats) - - // Test MOR / Read Optimized - runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStats) } // TODO add test for incremental query of the table with logs @@ -222,23 +277,6 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with else fail("Only Spark 3 and Spark 2 are currently supported") - // Stats for the reads fetching _all_ columns (note, how amount of bytes read - // is invariant of the # of columns) - val fullColumnsReadStats: Array[(String, Long)] = - if (HoodieSparkUtils.isSpark3) - Array( - ("rider", 19684), - ("rider,driver", 19684), - ("rider,driver,tip_history", 19684)) - else if (HoodieSparkUtils.isSpark2) - // TODO re-enable tests (these tests are very unstable currently) - Array( - ("rider", -1), - ("rider,driver", -1), - ("rider,driver,tip_history", -1)) - else - fail("Only Spark 3 and Spark 2 are currently supported") - val incrementalOpts: Map[String, String] = Map( DataSourceReadOptions.BEGIN_INSTANTTIME.key -> "001" ) @@ -249,10 +287,9 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with // Test MOR / Incremental / Payload-combine runTest(tableState, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, - fullColumnsReadStats, incrementalOpts) + projectedColumnsReadStats, incrementalOpts) } - // Test routine private def runTest(tableState: TableState, queryType: String, @@ -322,6 +359,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with inputDF.write.format("org.apache.hudi") .options(opts) + .option(HoodieTableConfig.POPULATE_META_FIELDS.key, populateMetaFields.toString) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .mode(SaveMode.Overwrite) @@ -354,6 +392,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with inputDF.write.format("org.apache.hudi") .options(opts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .option(HoodieTableConfig.POPULATE_META_FIELDS.key, populateMetaFields.toString) .mode(SaveMode.Append) .save(path)