Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.model;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

Expand All @@ -30,9 +31,6 @@
import java.util.Map;
import java.util.Properties;

import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal;

/**
* {@link HoodieRecordPayload} impl that honors ordering field in both preCombine and combineAndGetUpdateValue.
* <p>
Expand All @@ -57,7 +55,7 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
return Option.empty();
}

GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);

// Null check is needed here to support schema evolution. The record in storage may be from old schema where
// the new ordering column might not be present and hence returns null.
Expand All @@ -81,7 +79,7 @@ public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties
if (recordBytes.length == 0) {
return Option.empty();
}
GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
eventTime = updateEventTime(incomingRecord, properties);

return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
Expand Down Expand Up @@ -123,4 +121,24 @@ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled);
return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
}

/**
* a wrapper of HoodieAvroUtils.getNestedFieldVal.
* Within it, catch exceptions and return null when "returnNullIfNotFound" is true and can't take effect.
*/
private static Object getNestedFieldVal(
GenericRecord record,
String fieldName,
boolean returnNullIfNotFound,
boolean consistentLogicalTimestampEnabled) {
try {
return HoodieAvroUtils.getNestedFieldVal(record, fieldName, returnNullIfNotFound, consistentLogicalTimestampEnabled);
} catch (Exception e) {
if (returnNullIfNotFound) {
return null;
} else {
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.{HoodieAvroSerializer, HoodieAvroDeserializer}
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}

import java.io.Closeable
import java.util.Properties

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -54,15 +55,18 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val preCombineField = tableState.preCombineField
private val recordKeyFieldOpt = tableState.recordKeyFieldOpt
private val payloadProps = if (preCombineField.isDefined) {
Some(HoodiePayloadConfig.newBuilder.withPayloadOrderingField(preCombineField.get).build.getProps)
HoodiePayloadConfig.newBuilder
.withPayloadOrderingField(preCombineField.get)
.build.getProps
} else {
None
new Properties()
}
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
val rows = read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
extractRequiredSchema(rows)
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
logFileIterator(logFileOnlySplit, getConfig)
case skipMergeSplit if skipMergeSplit.mergeType
Expand Down Expand Up @@ -118,6 +122,18 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
rows
}

private def extractRequiredSchema(iter: Iterator[InternalRow]): Iterator[InternalRow] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name could imply returning schema

Suggested change
private def extractRequiredSchema(iter: Iterator[InternalRow]): Iterator[InternalRow] = {
private def toRowsWithRequiredSchema(iter: Iterator[InternalRow]): Iterator[InternalRow] = {

val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i should remove this.

val requiredFieldPosition = tableState.requiredStructSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
val rows = iter.map { row =>
unsafeProjection(createRowWithRequiredSchema(row, requiredFieldPosition))
}
rows
}

private def logFileIterator(split: HoodieMergeOnReadFileSplit,
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable {
Expand Down Expand Up @@ -188,7 +204,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@scala.annotation.tailrec
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
recordToLoad = baseFileIterator.next()
val curRow = baseFileIterator.next()
recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow, requiredFieldPosition))
true
} else {
if (logRecordsKeyIterator.hasNext) {
Expand Down Expand Up @@ -272,7 +289,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
} else {
// No merge needed, load current row with required schema
recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow))
recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow, requiredFieldPosition))
true
}
} else {
Expand Down Expand Up @@ -317,32 +334,29 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
}

private def createRowWithRequiredSchema(row: InternalRow): InternalRow = {
val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema)
val posIterator = requiredFieldPosition.iterator
var curIndex = 0
tableState.requiredStructSchema.foreach(
f => {
val curPos = posIterator.next()
val curField = if (row.isNullAt(curPos)) null else row.get(curPos, f.dataType)
rowToReturn.update(curIndex, curField)
curIndex = curIndex + 1
}
)
rowToReturn
}

private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
if (payloadProps.isDefined) {
logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord,
tableAvroSchema, payloadProps.get)
} else {
logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema)
}
logRecords.get(curKey).getData.combineAndGetUpdateValue(
historyAvroRecord, tableAvroSchema, payloadProps)
}
}
}

private def createRowWithRequiredSchema(row: InternalRow, requiredFieldPosition: Seq[Int]): InternalRow = {
val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema)
val posIterator = requiredFieldPosition.iterator
var curIndex = 0
tableState.requiredStructSchema.foreach(
f => {
val curPos = posIterator.next()
val curField = if (row.isNullAt(curPos)) null else row.get(curPos, f.dataType)
rowToReturn.update(curIndex, curField)
curIndex = curIndex + 1
}
)
rowToReturn
}

}

private object HoodieMergeOnReadRDD {
val CONFIG_INSTANTIATION_LOCK = new Object()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ package org.apache.hudi

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, HoodieTimelineTimeZone, WriteOperationType}
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils, StringUtils}
Expand All @@ -42,12 +44,14 @@ import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.hudi.table.BulkInsertPartitioner

import org.apache.log4j.LogManager

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql._
import org.apache.spark.SparkContext

import java.util.Properties
Expand Down Expand Up @@ -730,6 +734,11 @@ object HoodieSparkSqlWriter {
mergedParams(key) = value
}
}

// use preCombineField to fill in PAYLOAD_ORDERING_FIELD_PROP_KEY
if (mergedParams.contains(PRECOMBINE_FIELD.key())) {
mergedParams.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, mergedParams(PRECOMBINE_FIELD.key()))
}
val params = mergedParams.toMap
(params, HoodieWriterUtils.convertMapToHoodieConfig(params))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = requiredStructSchema,
requiredSchema = tableStructSchema,
filters = pushDownFilter,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = requiredStructSchema,
requiredSchema = tableStructSchema,
filters = filters,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
Expand Down