Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3660a0b
Rebased `HoodieUnsafeRDD` to become a trait;
Feb 23, 2022
7aa0b61
Refactored `HoodieMergeOnReadRDD` iterators to re-use iteration logic
Feb 23, 2022
390c47c
Tidying up
Feb 23, 2022
04590d3
Tidying up (cont)
Feb 23, 2022
fdbcc70
Cleaned up MOR Relation data-flows
Feb 23, 2022
09b1778
Tidying up
Feb 23, 2022
271f7f4
Tidying up
Feb 23, 2022
c926bb6
Adding missing docs
Feb 23, 2022
bb60abb
Decoupled Record Merging iterator from assuming particular schema of …
Feb 23, 2022
dc6f336
Removed superfluous `Option`, causing incorrect split processing
Feb 23, 2022
a90bf12
Added optimization to avoid full-schema base-file read in cases when
Feb 23, 2022
3132b10
Fixing compilation
Feb 24, 2022
45d8265
Replacing self-type restriction w/ sub-classing
Feb 24, 2022
db74831
Missing license
Feb 24, 2022
c1e7282
Fixing compilation
Feb 24, 2022
0e08367
After rebase fixes
Mar 11, 2022
6b036e6
Tidying up (after rebase)
Mar 17, 2022
3d80eab
Extracted `DeltaLogSupport` trait encapsulating reading from Delta Logs
Mar 18, 2022
a6a9831
Fixed Merging Record iterator incorrectly projecting merged record
Mar 18, 2022
5a7648f
Tidying up
Mar 18, 2022
f45e98d
Inlined `DeltaLogSupport` trait
Mar 18, 2022
98658f8
Fixing NPE
Mar 18, 2022
2f63d1d
Adjusted column projection tests to reflect the changes;
Mar 18, 2022
83e97b3
Tidying up
Mar 18, 2022
16e51f8
Allow non-partitioned table to rely on virtual-keys
Mar 18, 2022
f8ce13d
Fixed tests
Mar 18, 2022
65d67ed
Pushed down `mandatoryColumns` defs into individual Relations
Mar 18, 2022
b4b6d0f
Clarified `requiredKeyField` semantic
Mar 18, 2022
692d96d
Relaxed requirements to read only projected schema to only cases when…
Mar 18, 2022
e23b648
Fixed tests for Spark 2.x
Mar 18, 2022
762f555
Tidying up
Mar 18, 2022
16f6204
Fixed tests for Spark 3.x
Mar 18, 2022
f497df2
Revert some inadvertent changes
Mar 21, 2022
b146570
Clean up referecnes to MT
Mar 22, 2022
73a645b
Tidying up
Mar 22, 2022
7bfcb40
Inlined `maxCompactionMemoryInBytes` w/in `HoodieMergeOnReadRDD`
Mar 25, 2022
54be92b
Tidying up
Mar 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,6 @@ object AvroConversionUtils {
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
}

def buildAvroRecordBySchema(record: IndexedRecord,
requiredSchema: Schema,
requiredPos: Seq[Int],
recordBuilder: GenericRecordBuilder): GenericRecord = {
val requiredFields = requiredSchema.getFields.asScala
assert(requiredFields.length == requiredPos.length)
val positionIterator = requiredPos.iterator
requiredFields.foreach(f => recordBuilder.set(f, record.get(positionIterator.next())))
recordBuilder.build()
}

def getAvroRecordNameAndNamespace(tableName: String): (String, String) = {
val name = HoodieAvroUtils.sanitizeName(tableName)
(s"${name}_record", s"hoodie.${name}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SQLContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
Expand Down Expand Up @@ -52,6 +52,9 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,

override type FileSplit = HoodieBaseFileSplit

override lazy val mandatoryColumns: Seq[String] =
Seq(recordKeyField)

protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
partitionSchema: StructType,
tableSchema: HoodieTableSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{getPartitionPath, isMetadataTable}
import org.apache.hudi.HoodieBaseRelation.getPartitionPath
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.SerializableConfiguration
import org.apache.hudi.common.fs.FSUtils
Expand All @@ -32,8 +32,9 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
Expand All @@ -53,8 +54,12 @@ trait HoodieFileSplit {}

case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String)

case class HoodieTableState(recordKeyField: String,
preCombineFieldOpt: Option[String])
case class HoodieTableState(tablePath: String,
latestCommitTimestamp: String,
recordKeyField: String,
preCombineFieldOpt: Option[String],
usesVirtualKeys: Boolean,
recordPayloadClassName: String)

/**
* Hoodie BaseRelation which extends [[PrunedFilteredScan]].
Expand All @@ -78,13 +83,30 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

protected lazy val basePath: String = metaClient.getBasePath

// If meta fields are enabled, always prefer key from the meta field as opposed to user-specified one
// NOTE: This is historical behavior which is preserved as is
// NOTE: Record key-field is assumed singular here due to the either of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same. can we avoid references to metadata table from this layer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually refers to metadata fields, not MT. Will amend

// - In case Hudi's meta fields are enabled: record key will be pre-materialized (stored) as part
// of the record's payload (as part of the Hudi's metadata)
// - In case Hudi's meta fields are disabled (virtual keys): in that case record has to bear _single field_
// identified as its (unique) primary key w/in its payload (this is a limitation of [[SimpleKeyGenerator]],
// which is the only [[KeyGenerator]] permitted for virtual-keys payloads)
protected lazy val recordKeyField: String =
if (tableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD
else tableConfig.getRecordKeyFieldProp
if (tableConfig.populateMetaFields()) {
HoodieRecord.RECORD_KEY_METADATA_FIELD
} else {
val keyFields = tableConfig.getRecordKeyFields.get()
checkState(keyFields.length == 1)
keyFields.head
}

protected lazy val preCombineFieldOpt: Option[String] = getPrecombineFieldProperty
protected lazy val preCombineFieldOpt: Option[String] =
Option(tableConfig.getPreCombineField)
.orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match {
// NOTE: This is required to compensate for cases when empty string is used to stub
// property value to avoid it being set with the default value
// TODO(HUDI-3456) cleanup
case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
case _ => None
}

protected lazy val specifiedQueryTimestamp: Option[String] =
optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
Expand Down Expand Up @@ -118,16 +140,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
FileStatusCache.getOrCreate(sparkSession))

/**
* Columns that relation has to read from the storage to properly execute on its semantic: for ex,
* for Merge-on-Read tables key fields as well and pre-combine field comprise mandatory set of columns,
* meaning that regardless of whether this columns are being requested by the query they will be fetched
* regardless so that relation is able to combine records properly (if necessary)
*
* @VisibleInTests
*/
lazy val mandatoryColumns: Seq[String] = {
if (isMetadataTable(metaClient)) {
Seq(HoodieMetadataPayload.KEY_FIELD_NAME, HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE)
} else {
// TODO this is MOR table requirement, not necessary for COW
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
}
}
val mandatoryColumns: Seq[String]

protected def timeline: HoodieTimeline =
// NOTE: We're including compaction here since it's not considering a "commit" operation
Expand All @@ -136,9 +156,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected def latestInstant: Option[HoodieInstant] =
toScalaOption(timeline.lastInstant())

protected def queryTimestamp: Option[String] = {
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(i => i.getTimestamp))
}
protected def queryTimestamp: Option[String] =
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))

override def schema: StructType = tableStructSchema

Expand Down Expand Up @@ -257,15 +276,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
requestedColumns ++ missing
}

private def getPrecombineFieldProperty: Option[String] =
Option(tableConfig.getPreCombineField)
.orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match {
// NOTE: This is required to compensate for cases when empty string is used to stub
// property value to avoid it being set with the default value
// TODO(HUDI-3456) cleanup
case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
case _ => None
}
protected def getTableState: HoodieTableState = {
// Subset of the state of table's configuration as of at the time of the query
HoodieTableState(
tablePath = basePath,
latestCommitTimestamp = queryTimestamp.get,
recordKeyField = recordKeyField,
preCombineFieldOpt = preCombineFieldOpt,
usesVirtualKeys = !tableConfig.populateMetaFields(),
recordPayloadClassName = tableConfig.getPayloadClass
)
}

private def imbueConfigs(sqlContext: SQLContext): Unit = {
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true")
Expand All @@ -280,9 +301,6 @@ object HoodieBaseRelation {
def getPartitionPath(fileStatus: FileStatus): Path =
fileStatus.getPath.getParent

def isMetadataTable(metaClient: HoodieTableMetaClient): Boolean =
HoodieTableMetadata.isMetadataTable(metaClient.getBasePath)

/**
* Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]]
* over [[InternalRow]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,6 @@ object HoodieDataSourceHelper extends PredicateHelper {
}
}

/**
* Convert [[InternalRow]] to [[SpecificInternalRow]].
*/
def createInternalRowWithSchema(
row: InternalRow,
schema: StructType,
positions: Seq[Int]): InternalRow = {
val rowToReturn = new SpecificInternalRow(schema)
var curIndex = 0
schema.zip(positions).foreach { case (field, pos) =>
val curField = if (row.isNullAt(pos)) {
null
} else {
row.get(pos, field.dataType)
}
rowToReturn.update(curIndex, curField)
curIndex += 1
}
rowToReturn
}


def splitFiles(
sparkSession: SparkSession,
file: FileStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,64 +20,15 @@ package org.apache.hudi

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException}
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}

case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit

/**
* TODO eval if we actually need it
*/
class HoodieFileScanRDD(@transient private val sparkSession: SparkSession,
readFunction: PartitionedFile => Iterator[InternalRow],
@transient fileSplits: Seq[HoodieBaseFileSplit])
extends HoodieUnsafeRDD(sparkSession.sparkContext) {

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[InternalRow] with AutoCloseable {
private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
private[this] var currentFile: PartitionedFile = _
private[this] var currentIterator: Iterator[InternalRow] = _

override def hasNext: Boolean = {
(currentIterator != null && currentIterator.hasNext) || nextIterator()
}

def next(): InternalRow = currentIterator.next()

/** Advances to the next file. Returns true if a new non-empty iterator is available. */
private def nextIterator(): Boolean = {
if (files.hasNext) {
currentFile = files.next()
logInfo(s"Reading File $currentFile")
currentIterator = readFunction(currentFile)

try {
hasNext
} catch {
case e: SchemaColumnConvertNotSupportedException =>
val message = "Parquet column cannot be converted in " +
s"file ${currentFile.filePath}. Column: ${e.getColumn}, " +
s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}"
throw new QueryExecutionException(message, e)

case e => throw e
}
} else {
currentFile = null
false
}
}

override def close(): Unit = {}
}

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener[Unit](_ => iterator.close())

iterator.asInstanceOf[Iterator[InternalRow]]
}
extends FileScanRDD(sparkSession, readFunction, fileSplits.map(_.filePartition))
with HoodieUnsafeRDD {

override protected def getPartitions: Array[Partition] = fileSplits.map(_.filePartition).toArray
override final def collect(): Array[InternalRow] = super[HoodieUnsafeRDD].collect()
}
Loading