diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index 3e7971b1b26f1..942a0aa097cd6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.model;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -30,9 +31,6 @@
import java.util.Map;
import java.util.Properties;
-import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
-import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal;
-
/**
* {@link HoodieRecordPayload} impl that honors ordering field in both preCombine and combineAndGetUpdateValue.
*
@@ -57,7 +55,7 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue
return Option.empty();
}
- GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+ GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
// Null check is needed here to support schema evolution. The record in storage may be from old schema where
// the new ordering column might not be present and hence returns null.
@@ -81,7 +79,7 @@ public Option getInsertValue(Schema schema, Properties properties
if (recordBytes.length == 0) {
return Option.empty();
}
- GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+ GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
eventTime = updateEventTime(incomingRecord, properties);
return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
@@ -123,4 +121,24 @@ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled);
return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
}
+
+ /**
+ * a wrapper of HoodieAvroUtils.getNestedFieldVal.
+ * Within it, catch exceptions and return null when "returnNullIfNotFound" is true and can't take effect.
+ */
+ private static Object getNestedFieldVal(
+ GenericRecord record,
+ String fieldName,
+ boolean returnNullIfNotFound,
+ boolean consistentLogicalTimestampEnabled) {
+ try {
+ return HoodieAvroUtils.getNestedFieldVal(record, fieldName, returnNullIfNotFound, consistentLogicalTimestampEnabled);
+ } catch (Exception e) {
+ if (returnNullIfNotFound) {
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 226fb01f43f90..ea9d5fc3f77bf 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -28,7 +28,7 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.avro.{HoodieAvroSerializer, HoodieAvroDeserializer}
+import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -36,6 +36,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
import java.io.Closeable
+import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -54,15 +55,18 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val preCombineField = tableState.preCombineField
private val recordKeyFieldOpt = tableState.recordKeyFieldOpt
private val payloadProps = if (preCombineField.isDefined) {
- Some(HoodiePayloadConfig.newBuilder.withPayloadOrderingField(preCombineField.get).build.getProps)
+ HoodiePayloadConfig.newBuilder
+ .withPayloadOrderingField(preCombineField.get)
+ .build.getProps
} else {
- None
+ new Properties()
}
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
- read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
+ val rows = read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
+ extractRequiredSchema(rows)
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
logFileIterator(logFileOnlySplit, getConfig)
case skipMergeSplit if skipMergeSplit.mergeType
@@ -118,6 +122,18 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
rows
}
+ private def extractRequiredSchema(iter: Iterator[InternalRow]): Iterator[InternalRow] = {
+ val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
+ val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
+ val requiredFieldPosition = tableState.requiredStructSchema
+ .map(f => tableAvroSchema.getField(f.name).pos()).toList
+ val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
+ val rows = iter.map { row =>
+ unsafeProjection(createRowWithRequiredSchema(row, requiredFieldPosition))
+ }
+ rows
+ }
+
private def logFileIterator(split: HoodieMergeOnReadFileSplit,
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable {
@@ -188,7 +204,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@scala.annotation.tailrec
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
- recordToLoad = baseFileIterator.next()
+ val curRow = baseFileIterator.next()
+ recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow, requiredFieldPosition))
true
} else {
if (logRecordsKeyIterator.hasNext) {
@@ -272,7 +289,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
} else {
// No merge needed, load current row with required schema
- recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow))
+ recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow, requiredFieldPosition))
true
}
} else {
@@ -317,32 +334,29 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
}
- private def createRowWithRequiredSchema(row: InternalRow): InternalRow = {
- val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema)
- val posIterator = requiredFieldPosition.iterator
- var curIndex = 0
- tableState.requiredStructSchema.foreach(
- f => {
- val curPos = posIterator.next()
- val curField = if (row.isNullAt(curPos)) null else row.get(curPos, f.dataType)
- rowToReturn.update(curIndex, curField)
- curIndex = curIndex + 1
- }
- )
- rowToReturn
- }
-
private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
- if (payloadProps.isDefined) {
- logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord,
- tableAvroSchema, payloadProps.get)
- } else {
- logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema)
- }
+ logRecords.get(curKey).getData.combineAndGetUpdateValue(
+ historyAvroRecord, tableAvroSchema, payloadProps)
}
}
-}
+
+ private def createRowWithRequiredSchema(row: InternalRow, requiredFieldPosition: Seq[Int]): InternalRow = {
+ val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema)
+ val posIterator = requiredFieldPosition.iterator
+ var curIndex = 0
+ tableState.requiredStructSchema.foreach(
+ f => {
+ val curPos = posIterator.next()
+ val curField = if (row.isNullAt(curPos)) null else row.get(curPos, f.dataType)
+ rowToReturn.update(curIndex, curField)
+ curIndex = curIndex + 1
+ }
+ )
+ rowToReturn
+ }
+
+ }
private object HoodieMergeOnReadRDD {
val CONFIG_INSTANTIATION_LOCK = new Object()
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 1f2aae4119c55..81522f77de86d 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -19,16 +19,18 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
+
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, HoodieTimelineTimeZone, WriteOperationType}
+import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils, StringUtils}
@@ -42,12 +44,14 @@ import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.hudi.table.BulkInsertPartitioner
+
import org.apache.log4j.LogManager
+
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
+import org.apache.spark.sql._
import org.apache.spark.SparkContext
import java.util.Properties
@@ -730,6 +734,11 @@ object HoodieSparkSqlWriter {
mergedParams(key) = value
}
}
+
+ // use preCombineField to fill in PAYLOAD_ORDERING_FIELD_PROP_KEY
+ if (mergedParams.contains(PRECOMBINE_FIELD.key())) {
+ mergedParams.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, mergedParams(PRECOMBINE_FIELD.key()))
+ }
val params = mergedParams.toMap
(params, HoodieWriterUtils.convertMapToHoodieConfig(params))
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index bc83a85415de2..9b96abd09c4ca 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -145,7 +145,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
- requiredSchema = requiredStructSchema,
+ requiredSchema = tableStructSchema,
filters = pushDownFilter,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index c4d670bb62f8a..449d87b8acd4b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -131,7 +131,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
- requiredSchema = requiredStructSchema,
+ requiredSchema = tableStructSchema,
filters = filters,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()