Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskCont

import java.io.Closeable
import java.util.Properties
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.util.Try

Expand Down Expand Up @@ -188,17 +189,23 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] =
logRecords.remove(key)

override def hasNext: Boolean =
override def hasNext: Boolean = hasNextInternal

// NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure
// that recursion is unfolded into a loop to avoid stack overflows while
// handling records
@tailrec private def hasNextInternal: Boolean = {
logRecordsIterator.hasNext && {
val avroRecordOpt = logRecordsIterator.next()
if (avroRecordOpt.isEmpty) {
// Record has been deleted, skipping
this.hasNext
this.hasNextInternal
} else {
recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
true
}
}
}

override final def next(): InternalRow = recordToLoad

Expand Down Expand Up @@ -257,7 +264,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,

private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)

override def hasNext: Boolean = {
override def hasNext: Boolean = hasNextInternal

// NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure
// that recursion is unfolded into a loop to avoid stack overflows while
// handling records
@tailrec private def hasNextInternal: Boolean = {
if (baseFileIterator.hasNext) {
val curRowRecord = baseFileIterator.next()
val curKey = curRowRecord.getString(recordKeyOrdinal)
Expand All @@ -270,7 +282,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get)
if (mergedAvroRecordOpt.isEmpty) {
// Record has been deleted, skipping
this.hasNext
this.hasNextInternal
} else {
// NOTE: In occurrence of a merge we can't know the schema of the record being returned, b/c
// record from the Delta Log will bear (full) Table schema, while record from the Base file
Expand Down