diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala index 7d96689ae0c15..598059c030d52 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala @@ -40,7 +40,7 @@ import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.avro.Schema -import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} +import org.apache.avro.generic.{GenericData, GenericRecord, GenericRecordBuilder, IndexedRecord} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -310,16 +310,16 @@ class HoodieCDCRDD( case HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER => recordToLoad.update(0, convertToUTF8String(String.valueOf(record.get(0)))) val before = record.get(2).asInstanceOf[GenericRecord] - recordToLoad.update(2, convertToUTF8String(HoodieCDCUtils.recordToJson(before))) + recordToLoad.update(2, recordToJsonAsUTF8String(before)) val after = record.get(3).asInstanceOf[GenericRecord] - recordToLoad.update(3, convertToUTF8String(HoodieCDCUtils.recordToJson(after))) + recordToLoad.update(3, recordToJsonAsUTF8String(after)) case HoodieCDCSupplementalLoggingMode.WITH_BEFORE => val row = cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow] val op = row.getString(0) val recordKey = row.getString(1) recordToLoad.update(0, convertToUTF8String(op)) val before = record.get(2).asInstanceOf[GenericRecord] - recordToLoad.update(2, convertToUTF8String(HoodieCDCUtils.recordToJson(before))) + recordToLoad.update(2, recordToJsonAsUTF8String(before)) parse(op) match { case INSERT => recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey))) @@ -338,10 +338,10 @@ class HoodieCDCRDD( recordToLoad.update(2, null) recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey))) case UPDATE => - recordToLoad.update(2, convertRowToJsonString(deserialize(beforeImageRecords(recordKey)))) + recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey))) recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey))) case _ => - recordToLoad.update(2, convertRowToJsonString(deserialize(beforeImageRecords(recordKey)))) + recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey))) recordToLoad.update(3, null) } } @@ -355,7 +355,7 @@ class HoodieCDCRDD( } /** - * Load the next log record, and judege how to convert it to cdc format. + * Load the next log record, and judge how to convert it to cdc format. */ private def loadNextLogRecord(): Boolean = { var loaded = false @@ -370,7 +370,7 @@ class HoodieCDCRDD( } else { // there is a real record deleted. recordToLoad.update(0, CDCRelation.CDC_OPERATION_DELETE) - recordToLoad.update(2, convertRowToJsonString(deserialize(existingRecordOpt.get))) + recordToLoad.update(2, recordToJsonAsUTF8String(existingRecordOpt.get)) recordToLoad.update(3, null) loaded = true } @@ -378,25 +378,24 @@ class HoodieCDCRDD( val existingRecordOpt = beforeImageRecords.get(key) if (existingRecordOpt.isEmpty) { // a new record is inserted. - val insertedRecord = convertIndexedRecordToRow(indexedRecord.get) + val insertedRecord = projectAvroUnsafe(indexedRecord.get) recordToLoad.update(0, CDCRelation.CDC_OPERATION_INSERT) recordToLoad.update(2, null) - recordToLoad.update(3, convertRowToJsonString(insertedRecord)) + recordToLoad.update(3, recordToJsonAsUTF8String(insertedRecord)) // insert into beforeImageRecords - beforeImageRecords(key) = serialize(insertedRecord) + beforeImageRecords(key) = insertedRecord loaded = true } else { // a existed record is updated. val existingRecord = existingRecordOpt.get val merged = merge(existingRecord, logRecord) - val mergeRow = convertIndexedRecordToRow(merged) - val existingRow = deserialize(existingRecord) - if (mergeRow != existingRow) { + val mergeRecord = projectAvroUnsafe(merged) + if (existingRecord != mergeRecord) { recordToLoad.update(0, CDCRelation.CDC_OPERATION_UPDATE) - recordToLoad.update(2, convertRowToJsonString(existingRow)) - recordToLoad.update(3, convertRowToJsonString(mergeRow)) + recordToLoad.update(2, recordToJsonAsUTF8String(existingRecord)) + recordToLoad.update(3, recordToJsonAsUTF8String(mergeRecord)) // update into beforeImageRecords - beforeImageRecords(key) = serialize(mergeRow) + beforeImageRecords(key) = mergeRecord loaded = true } } @@ -512,7 +511,9 @@ class HoodieCDCRDD( val iter = loadFileSlice(fileSlice) iter.foreach { row => val key = getRecordKey(row) - beforeImageRecords.put(key, serialize(row)) + // Due to the reuse buffer mechanism of Spark serialization, + // we have to copy the serialized result if we need to retain its reference + beforeImageRecords.put(key, serialize(row, copy = true)) } // reset beforeImageFiles beforeImageFiles.clear() @@ -577,8 +578,17 @@ class HoodieCDCRDD( p.toUri.toString } - private def serialize(curRowRecord: InternalRow): GenericRecord = { - serializer.serialize(curRowRecord).asInstanceOf[GenericRecord] + private def serialize(curRowRecord: InternalRow, copy: Boolean = false): GenericRecord = { + val record = serializer.serialize(curRowRecord).asInstanceOf[GenericRecord] + if (copy) { + GenericData.get().deepCopy(record.getSchema, record) + } else { + record + } + } + + private def recordToJsonAsUTF8String(record: GenericRecord): UTF8String = { + convertToUTF8String(HoodieCDCUtils.recordToJson(record)) } private def getRecordKey(row: InternalRow): String = { @@ -595,11 +605,9 @@ class HoodieCDCRDD( toScalaOption(record.getData.getInsertValue(avroSchema, payloadProps)) } - private def convertIndexedRecordToRow(record: IndexedRecord): InternalRow = { - deserialize( - LogFileIterator.projectAvroUnsafe(record.asInstanceOf[GenericRecord], - avroSchema, reusableRecordBuilder) - ) + private def projectAvroUnsafe(record: IndexedRecord): GenericRecord = { + LogFileIterator.projectAvroUnsafe(record.asInstanceOf[GenericRecord], + avroSchema, reusableRecordBuilder) } private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): IndexedRecord = {