Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.avro.generic.{GenericData, GenericRecord, GenericRecordBuilder, IndexedRecord}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -310,16 +310,16 @@ class HoodieCDCRDD(
case HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER =>
recordToLoad.update(0, convertToUTF8String(String.valueOf(record.get(0))))
val before = record.get(2).asInstanceOf[GenericRecord]
recordToLoad.update(2, convertToUTF8String(HoodieCDCUtils.recordToJson(before)))
recordToLoad.update(2, recordToJsonAsUTF8String(before))
val after = record.get(3).asInstanceOf[GenericRecord]
recordToLoad.update(3, convertToUTF8String(HoodieCDCUtils.recordToJson(after)))
recordToLoad.update(3, recordToJsonAsUTF8String(after))
case HoodieCDCSupplementalLoggingMode.WITH_BEFORE =>
val row = cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
val op = row.getString(0)
val recordKey = row.getString(1)
recordToLoad.update(0, convertToUTF8String(op))
val before = record.get(2).asInstanceOf[GenericRecord]
recordToLoad.update(2, convertToUTF8String(HoodieCDCUtils.recordToJson(before)))
recordToLoad.update(2, recordToJsonAsUTF8String(before))
parse(op) match {
case INSERT =>
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
Expand All @@ -338,10 +338,10 @@ class HoodieCDCRDD(
recordToLoad.update(2, null)
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
case UPDATE =>
recordToLoad.update(2, convertRowToJsonString(deserialize(beforeImageRecords(recordKey))))
recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
case _ =>
recordToLoad.update(2, convertRowToJsonString(deserialize(beforeImageRecords(recordKey))))
recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
recordToLoad.update(3, null)
}
}
Expand All @@ -355,7 +355,7 @@ class HoodieCDCRDD(
}

/**
* Load the next log record, and judege how to convert it to cdc format.
* Load the next log record, and judge how to convert it to cdc format.
*/
private def loadNextLogRecord(): Boolean = {
var loaded = false
Expand All @@ -370,33 +370,32 @@ class HoodieCDCRDD(
} else {
// there is a real record deleted.
recordToLoad.update(0, CDCRelation.CDC_OPERATION_DELETE)
recordToLoad.update(2, convertRowToJsonString(deserialize(existingRecordOpt.get)))
recordToLoad.update(2, recordToJsonAsUTF8String(existingRecordOpt.get))
recordToLoad.update(3, null)
loaded = true
}
} else {
val existingRecordOpt = beforeImageRecords.get(key)
if (existingRecordOpt.isEmpty) {
// a new record is inserted.
val insertedRecord = convertIndexedRecordToRow(indexedRecord.get)
val insertedRecord = projectAvroUnsafe(indexedRecord.get)
recordToLoad.update(0, CDCRelation.CDC_OPERATION_INSERT)
recordToLoad.update(2, null)
recordToLoad.update(3, convertRowToJsonString(insertedRecord))
recordToLoad.update(3, recordToJsonAsUTF8String(insertedRecord))
// insert into beforeImageRecords
beforeImageRecords(key) = serialize(insertedRecord)
beforeImageRecords(key) = insertedRecord
loaded = true
} else {
// a existed record is updated.
val existingRecord = existingRecordOpt.get
val merged = merge(existingRecord, logRecord)
val mergeRow = convertIndexedRecordToRow(merged)
val existingRow = deserialize(existingRecord)
if (mergeRow != existingRow) {
val mergeRecord = projectAvroUnsafe(merged)
if (existingRecord != mergeRecord) {
recordToLoad.update(0, CDCRelation.CDC_OPERATION_UPDATE)
recordToLoad.update(2, convertRowToJsonString(existingRow))
recordToLoad.update(3, convertRowToJsonString(mergeRow))
recordToLoad.update(2, recordToJsonAsUTF8String(existingRecord))
recordToLoad.update(3, recordToJsonAsUTF8String(mergeRecord))
// update into beforeImageRecords
beforeImageRecords(key) = serialize(mergeRow)
beforeImageRecords(key) = mergeRecord
loaded = true
}
}
Expand Down Expand Up @@ -512,7 +511,9 @@ class HoodieCDCRDD(
val iter = loadFileSlice(fileSlice)
iter.foreach { row =>
val key = getRecordKey(row)
beforeImageRecords.put(key, serialize(row))
// Due to the reuse buffer mechanism of Spark serialization,
// we have to copy the serialized result if we need to retain its reference
beforeImageRecords.put(key, serialize(row, copy = true))
}
// reset beforeImageFiles
beforeImageFiles.clear()
Expand Down Expand Up @@ -577,8 +578,17 @@ class HoodieCDCRDD(
p.toUri.toString
}

private def serialize(curRowRecord: InternalRow): GenericRecord = {
serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
private def serialize(curRowRecord: InternalRow, copy: Boolean = false): GenericRecord = {
val record = serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
if (copy) {
GenericData.get().deepCopy(record.getSchema, record)
} else {
record
}
}

private def recordToJsonAsUTF8String(record: GenericRecord): UTF8String = {
convertToUTF8String(HoodieCDCUtils.recordToJson(record))
}

private def getRecordKey(row: InternalRow): String = {
Expand All @@ -595,11 +605,9 @@ class HoodieCDCRDD(
toScalaOption(record.getData.getInsertValue(avroSchema, payloadProps))
}

private def convertIndexedRecordToRow(record: IndexedRecord): InternalRow = {
deserialize(
LogFileIterator.projectAvroUnsafe(record.asInstanceOf[GenericRecord],
avroSchema, reusableRecordBuilder)
)
private def projectAvroUnsafe(record: IndexedRecord): GenericRecord = {
LogFileIterator.projectAvroUnsafe(record.asInstanceOf[GenericRecord],
avroSchema, reusableRecordBuilder)
}

private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): IndexedRecord = {
Expand Down